diff --git a/.config/dictionaries/project.dic b/.config/dictionaries/project.dic index 4a2b0d577..698a0317f 100644 --- a/.config/dictionaries/project.dic +++ b/.config/dictionaries/project.dic @@ -219,6 +219,7 @@ txos unfinalized unlinkat unsub +unsubscription usermod usvg utimensat @@ -240,4 +241,4 @@ xprivate xprv xpub zilla -zillable \ No newline at end of file +zillable diff --git a/hermes/bin/Cargo.toml b/hermes/bin/Cargo.toml index 17fb4b96e..be5a2d771 100644 --- a/hermes/bin/Cargo.toml +++ b/hermes/bin/Cargo.toml @@ -31,7 +31,7 @@ path = "tests/integration/tests/mod.rs" [dependencies] # Catalyst Internal Crates -hermes-ipfs = { version = "0.0.12", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "hermes-ipfs/v0.0.12", features = ["doc-sync"] } +hermes-ipfs = { version = "0.0.13", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "hermes-ipfs/v0.0.13", features = ["doc-sync"] } cardano-blockchain-types = { version = "0.0.9", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-blockchain-types/v0.0.9" } cardano-chain-follower = { version = "0.0.19", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-chain-follower/v0.0.19" } catalyst-types = { version = "0.0.12", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.12" } diff --git a/hermes/bin/src/ipfs/api.rs b/hermes/bin/src/ipfs/api.rs index b46003c00..8ba5db4af 100644 --- a/hermes/bin/src/ipfs/api.rs +++ b/hermes/bin/src/ipfs/api.rs @@ -13,6 +13,7 @@ use crate::{ }, hermes::doc_sync, }, + subscribe_to_topic, unsubscribe_from_topic, wasm::module::ModuleId, }; @@ -144,38 +145,19 @@ pub(crate) fn hermes_ipfs_dht_get_providers( } /// Returns the peer id of the node. -pub(crate) fn hermes_ipfs_get_peer_identity( - app_name: &ApplicationName, - peer: Option, -) -> Result { +pub(crate) async fn hermes_ipfs_get_peer_identity( + peer: Option +) -> Result, Errno> { let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?; - let res = if tokio::runtime::Handle::try_current().is_ok() { - tracing::debug!("identity with existing Tokio runtime"); - - let (tx, rx) = std::sync::mpsc::channel(); - - tokio::task::spawn_blocking(move || { - let handle = tokio::runtime::Handle::current(); - let res = handle.block_on(ipfs.get_peer_identity(peer)); - drop(tx.send(res)); - }); - - rx.recv().map_err(|_| Errno::PubsubPublishError) - } else { - tracing::debug!("identity without existing Tokio runtime"); - let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?; + let identity = ipfs.get_peer_identity(peer).await?; + tracing::debug!("Got peer identity"); - Ok(rt.block_on(ipfs.get_peer_identity(peer))) - }??; - - tracing::debug!(app_name = %app_name, "Got peer identity"); - - Ok(res) + Ok(identity) } -/// Subscribe to a topic -pub(crate) fn hermes_ipfs_subscribe( +/// Subscribe to a topic. +pub(crate) async fn hermes_ipfs_subscribe( kind: SubscriptionKind, app_name: &ApplicationName, tree: Option>>>, @@ -184,42 +166,21 @@ pub(crate) fn hermes_ipfs_subscribe( ) -> Result { let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?; tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "subscribing to PubSub topic"); - let module_ids_owned = module_ids.cloned(); - if ipfs.apps.topic_subscriptions_contains(kind, topic) { - tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "topic subscription stream already exists"); - } else { - let topic_owned = topic.clone(); - let app_name_owned = app_name.clone(); - let handle = if let Ok(rt) = tokio::runtime::Handle::try_current() { - tracing::debug!("subscribe with existing Tokio runtime"); - let (tx, rx) = std::sync::mpsc::channel(); - tokio::task::spawn_blocking(move || { - let res = rt.block_on(ipfs.pubsub_subscribe( - kind, - &topic_owned, - tree, - &app_name_owned, - module_ids_owned, - )); - drop(tx.send(res)); - }); - rx.recv().map_err(|_| Errno::PubsubSubscribeError)?? - } else { - tracing::debug!("subscribe without existing Tokio runtime"); - let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?; - rt.block_on(ipfs.pubsub_subscribe(kind, topic, tree, app_name, module_ids_owned))? - }; - ipfs.apps.added_topic_stream(kind, topic.clone(), handle); - tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "added subscription topic stream"); - } - ipfs.apps - .added_app_topic_subscription(kind, app_name.clone(), topic.clone()); + subscribe_to_topic!( + ipfs, + kind, + app_name, + topic, + ipfs.pubsub_subscribe(kind, topic, tree, app_name, module_ids) + .await + ); + Ok(true) } /// Unsubscribe from a topic -pub(crate) fn hermes_ipfs_unsubscribe( +pub(crate) async fn hermes_ipfs_unsubscribe( kind: SubscriptionKind, app_name: &ApplicationName, topic: &PubsubTopic, @@ -227,34 +188,19 @@ pub(crate) fn hermes_ipfs_unsubscribe( let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?; tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "unsubscribing from PubSub topic"); - if ipfs.apps.topic_subscriptions_contains(kind, topic) { - let topic_owned = topic.clone(); - if let Ok(rt) = tokio::runtime::Handle::try_current() { - tracing::debug!("unsubscribe with existing Tokio runtime"); - let (tx, rx) = std::sync::mpsc::channel(); - tokio::task::spawn_blocking(move || { - let res = rt.block_on(ipfs.pubsub_unsubscribe(&topic_owned)); - let _ = tx.send(res); - }); - rx.recv().map_err(|_| Errno::PubsubUnsubscribeError)??; - } else { - tracing::debug!("unsubscribe without existing Tokio runtime"); - let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?; - rt.block_on(ipfs.pubsub_unsubscribe(topic))?; - } + unsubscribe_from_topic!( + ipfs, + kind, + app_name, + topic, + ipfs.pubsub_unsubscribe(topic).await + ); - ipfs.apps.removed_topic_stream(kind, topic); - tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "removed subscription topic stream"); - } else { - tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "topic subscription does not exist"); - } - ipfs.apps - .removed_app_topic_subscription(kind, app_name, topic); Ok(true) } /// Publish message to a topic -pub(crate) fn hermes_ipfs_publish( +pub(crate) async fn hermes_ipfs_publish( app_name: &ApplicationName, topic: &PubsubTopic, message: MessageData, @@ -269,26 +215,7 @@ pub(crate) fn hermes_ipfs_publish( "📤 Publishing PubSub message" ); - let res = if tokio::runtime::Handle::try_current().is_ok() { - tracing::debug!("publish with existing Tokio runtime"); - - let (tx, rx) = std::sync::mpsc::channel(); - let topic_owned = topic.clone(); - - tokio::task::spawn_blocking(move || { - let handle = tokio::runtime::Handle::current(); - let res = handle.block_on(ipfs.pubsub_publish(topic_owned, message)); - let _ = tx.send(res); - }); - - rx.recv().map_err(|_| Errno::PubsubPublishError) - } else { - tracing::debug!("publish without existing Tokio runtime"); - - let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?; - - Ok(rt.block_on(ipfs.pubsub_publish(topic.to_string(), message))) - }?; + let res = ipfs.pubsub_publish(topic, message).await; match &res { Ok(()) => { diff --git a/hermes/bin/src/ipfs/blocking.rs b/hermes/bin/src/ipfs/blocking.rs new file mode 100644 index 000000000..9521a7909 --- /dev/null +++ b/hermes/bin/src/ipfs/blocking.rs @@ -0,0 +1,197 @@ +//! Hermes IPFS service. +//! +//! This module contains the blocking alternatives to IPFS function as opposed to async +//! enabled functions. + +use std::{ + convert::Infallible, + sync::{Arc, Mutex}, +}; + +use catalyst_types::smt::Tree; +use tokio::{sync::oneshot, task::JoinHandle}; + +use super::task::IpfsCommand; +pub(crate) use super::task::SubscriptionKind; +use crate::{ + app::ApplicationName, + ipfs::{HERMES_IPFS, HermesIpfsNode}, + runtime_extensions::{ + bindings::hermes::ipfs::api::{Errno, MessageData, PeerId, PubsubTopic}, + hermes::{self, doc_sync}, + }, + subscribe_to_topic, unsubscribe_from_topic, + wasm::module::ModuleId, +}; + +impl HermesIpfsNode +where N: hermes_ipfs::rust_ipfs::NetworkBehaviour + Send + Sync +{ + /// Get the peer identity in a non-async context. + pub(super) fn get_peer_identity_blocking( + &self, + peer: Option, + ) -> Result, Errno> { + let (cmd_tx, cmd_rx) = oneshot::channel(); + self.sender + .as_ref() + .ok_or(Errno::GetPeerIdError)? + .blocking_send(IpfsCommand::Identity(peer, cmd_tx)) + .map_err(|_| Errno::GetPeerIdError)?; + cmd_rx.blocking_recv().map_err(|_| Errno::GetPeerIdError)? + } + + /// Publish message to a `PubSub` topic in the non-async context. + pub(super) fn pubsub_publish_blocking( + &self, + topic: &PubsubTopic, + message: MessageData, + ) -> Result<(), Errno> { + let (cmd_tx, cmd_rx) = oneshot::channel(); + self.sender + .as_ref() + .ok_or(Errno::PubsubPublishError)? + .blocking_send(IpfsCommand::Publish(topic.clone(), message, cmd_tx)) + .map_err(|_| Errno::PubsubPublishError)?; + cmd_rx + .blocking_recv() + .map_err(|_| Errno::PubsubPublishError)? + } + + /// Subscribe to a `PubSub` topic in a non-async context. + pub(super) fn pubsub_subscribe_blocking( + &self, + kind: SubscriptionKind, + topic: &PubsubTopic, + tree: Option>>>, + app_name: &ApplicationName, + module_ids: Option<&Vec>, + ) -> Result, Errno> { + let (cmd_tx, cmd_rx) = oneshot::channel(); + let module_ids_owned = module_ids.cloned(); + self.sender + .as_ref() + .ok_or(Errno::PubsubSubscribeError)? + .blocking_send(IpfsCommand::Subscribe( + topic.clone(), + kind, + tree, + app_name.clone(), + module_ids_owned, + cmd_tx, + )) + .map_err(|_| Errno::PubsubSubscribeError)?; + cmd_rx + .blocking_recv() + .map_err(|_| Errno::PubsubSubscribeError)? + } + + /// Unsubscribe from a `PubSub` topic in the non-async context + pub(super) fn pubsub_unsubscribe_blocking( + &self, + topic: &PubsubTopic, + ) -> Result<(), Errno> { + let (cmd_tx, cmd_rx) = oneshot::channel(); + self.sender + .as_ref() + .ok_or(Errno::PubsubUnsubscribeError)? + .blocking_send(IpfsCommand::Unsubscribe(topic.clone(), cmd_tx)) + .map_err(|_| Errno::PubsubUnsubscribeError)?; + cmd_rx + .blocking_recv() + .map_err(|_| Errno::PubsubUnsubscribeError)? + } +} + +/// Returns the peer id of the node in the non-async context. +pub(crate) fn hermes_ipfs_get_peer_identity( + peer: Option +) -> Result, Errno> { + let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?; + + let identity = ipfs.get_peer_identity_blocking(peer)?; + tracing::debug!("Got peer identity"); + + Ok(identity) +} + +/// Subscribe to a topic in the non-async context. +pub(crate) fn hermes_ipfs_subscribe( + kind: SubscriptionKind, + app_name: &ApplicationName, + tree: Option>>>, + topic: &PubsubTopic, + module_ids: Option<&Vec>, +) -> Result { + let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?; + tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "subscribing to PubSub topic"); + + subscribe_to_topic!( + ipfs, + kind, + app_name, + topic, + ipfs.pubsub_subscribe_blocking(kind, topic, tree, app_name, module_ids) + ); + + Ok(true) +} + +/// Unsubscribe from a topic in the non-async context +pub(crate) fn hermes_ipfs_unsubscribe( + kind: SubscriptionKind, + app_name: &ApplicationName, + topic: &PubsubTopic, +) -> Result { + let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?; + tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "unsubscribing from PubSub topic"); + + unsubscribe_from_topic!( + ipfs, + kind, + app_name, + topic, + ipfs.pubsub_unsubscribe_blocking(topic) + ); + + Ok(true) +} + +/// Publish message to a topic in the non-async context. +pub(crate) fn hermes_ipfs_publish( + app_name: &ApplicationName, + topic: &PubsubTopic, + message: MessageData, +) -> Result<(), Errno> { + let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?; + + // Log publish attempt with message size + tracing::info!( + app_name = %app_name, + topic = %topic, + message_size = message.len(), + "📤 Publishing PubSub message" + ); + + let res = ipfs.pubsub_publish_blocking(topic, message); + + match &res { + Ok(()) => { + tracing::info!( + app_name = %app_name, + topic = %topic, + "✅ PubSub publish succeeded" + ); + }, + Err(e) => { + tracing::error!( + app_name = %app_name, + topic = %topic, + error = ?e, + "❌ PubSub publish failed" + ); + }, + } + + res +} diff --git a/hermes/bin/src/ipfs/doc_sync/reconciliation.rs b/hermes/bin/src/ipfs/doc_sync/reconciliation.rs index 82aeb9181..8f6730bc7 100644 --- a/hermes/bin/src/ipfs/doc_sync/reconciliation.rs +++ b/hermes/bin/src/ipfs/doc_sync/reconciliation.rs @@ -9,8 +9,9 @@ use minicbor::{Encode, Encoder}; use crate::{ app::ApplicationName, ipfs::{ - self, api::hermes_ipfs_unsubscribe, hermes_ipfs_get_peer_identity, hermes_ipfs_publish, - hermes_ipfs_subscribe, + self, + api::{hermes_ipfs_subscribe, hermes_ipfs_unsubscribe}, + hermes_ipfs_get_peer_identity, hermes_ipfs_publish, }, runtime_extensions::{ bindings::hermes::ipfs::api::PeerId, @@ -47,7 +48,7 @@ pub(crate) struct DocReconciliationData { } /// Starts the document reconciliation process. -pub(super) fn start_reconciliation( +pub(super) async fn start_reconciliation( doc_reconciliation_data: DocReconciliationData, app_name: &ApplicationName, tree: Arc>>, @@ -55,14 +56,14 @@ pub(super) fn start_reconciliation( module_ids: Option<&Vec>, peer: Option, ) -> anyhow::Result<()> { - subscribe_to_dif(app_name, tree, channel, module_ids)?; + subscribe_to_dif(app_name, tree, channel, module_ids).await?; tracing::info!(%channel, "subscribed to .dif"); - let syn_payload = make_syn_payload(doc_reconciliation_data, app_name, peer); + let syn_payload = make_syn_payload(doc_reconciliation_data, peer).await; tracing::info!("SYN payload created"); - if let Err(err) = send_syn_payload(&syn_payload, app_name, channel) { - unsubscribe_from_dif(app_name, channel)?; + if let Err(err) = send_syn_payload(&syn_payload, app_name, channel).await { + unsubscribe_from_dif(app_name, channel).await?; tracing::info!(%channel, "unsubscribed from .dif"); return Err(err); } @@ -72,7 +73,7 @@ pub(super) fn start_reconciliation( } /// Subscribes to ".dif" topic in order to receive responses for the ".syn" requests. -fn subscribe_to_dif( +async fn subscribe_to_dif( app_name: &ApplicationName, tree: Arc>>, channel: &str, @@ -85,22 +86,23 @@ fn subscribe_to_dif( Some(tree), &topic, module_ids, - )?; + ) + .await?; Ok(()) } /// Unsubscribes from ".dif" topic. -fn unsubscribe_from_dif( +async fn unsubscribe_from_dif( app_name: &ApplicationName, channel: &str, ) -> anyhow::Result<()> { let topic = format!("{channel}.dif"); - hermes_ipfs_unsubscribe(ipfs::SubscriptionKind::DocSync, app_name, &topic)?; + hermes_ipfs_unsubscribe(ipfs::SubscriptionKind::DocSync, app_name, &topic).await?; Ok(()) } /// Creates the new SYN payload. -fn make_syn_payload( +async fn make_syn_payload( DocReconciliationData { our_root, our_count, @@ -108,10 +110,10 @@ fn make_syn_payload( their_root, their_count, }: DocReconciliationData, - app_name: &ApplicationName, peer: Option, ) -> MsgSyn { - let public_key = hermes_ipfs_get_peer_identity(app_name, peer) + let public_key = hermes_ipfs_get_peer_identity(peer) + .await .map_err(|err| { tracing::info!( %err, @@ -119,6 +121,7 @@ fn make_syn_payload( ); }) .ok() + .flatten() .and_then(|peer_info| { peer_info .public_key @@ -158,7 +161,7 @@ fn make_syn_payload( } /// Sends the SYN payload to request the reconciliation data. -fn send_syn_payload( +async fn send_syn_payload( payload: &MsgSyn, app_name: &ApplicationName, channel: &str, @@ -169,7 +172,7 @@ fn send_syn_payload( payload .encode(&mut enc, &mut ()) .map_err(|e| anyhow::anyhow!("Failed to encode syn_payload::MsgSyn: {e}"))?; - hermes_ipfs_publish(app_name, &topic, payload_bytes)?; + hermes_ipfs_publish(app_name, &topic, payload_bytes).await?; Ok(()) } diff --git a/hermes/bin/src/ipfs/doc_sync/topic_handler.rs b/hermes/bin/src/ipfs/doc_sync/topic_handler.rs index 30a53509e..7ff884ff2 100644 --- a/hermes/bin/src/ipfs/doc_sync/topic_handler.rs +++ b/hermes/bin/src/ipfs/doc_sync/topic_handler.rs @@ -10,34 +10,33 @@ use crate::ipfs::{ }; /// A helper trait to handle the IPFS messages of a specific topic. -pub(crate) trait TopicHandler<'a>: Sized -where Self: minicbor::Decode<'a, ()> -{ +pub(crate) trait TopicHandler: Sized { /// A suffix of the IPFS topic to which the handler is subscribed const TOPIC_SUFFIX: &'static str; /// Decodes the payload of the IPFS message. - fn decode(payload: &'a [u8]) -> Result { + fn decode<'a>(payload: &'a [u8]) -> Result + where Self: minicbor::Decode<'a, ()> { minicbor::decode::(payload) } /// Handles the IPFS message. - fn handle( + async fn handle( self, topic: &str, source: Option, - context: &TopicMessageContext, + context: TopicMessageContext, ) -> anyhow::Result<()>; } -impl TopicHandler<'_> for payload::New { +impl TopicHandler for payload::New { const TOPIC_SUFFIX: &'static str = ".new"; - fn handle( + async fn handle( self, topic: &str, source: Option, - context: &TopicMessageContext, + context: TopicMessageContext, ) -> anyhow::Result<()> { let Some(tree) = context.tree() else { return Err(anyhow::anyhow!( @@ -89,7 +88,9 @@ impl TopicHandler<'_> for payload::New { channel_name, context.module_ids(), source.map(|p| p.to_string()), - ) { + ) + .await + { return Err(anyhow::anyhow!( "Failed to start reconciliation: {err}", )); @@ -132,10 +133,10 @@ impl TopicHandler<'_> for payload::New { } /// Handles the IPFS messages of a specific topic. -pub(crate) fn handle_doc_sync_topic<'a, TH: TopicHandler<'a>>( +pub(crate) async fn handle_doc_sync_topic<'a, TH: TopicHandler + minicbor::Decode<'a, ()>>( message: &'a hermes_ipfs::rust_ipfs::GossipsubMessage, topic: &str, - context: &TopicMessageContext, + context: TopicMessageContext, ) -> Option> { if !topic.ends_with(TH::TOPIC_SUFFIX) { return None; @@ -143,7 +144,7 @@ pub(crate) fn handle_doc_sync_topic<'a, TH: TopicHandler<'a>>( let decoded = ::decode(&message.data); match decoded { - Ok(handler) => Some(handler.handle(topic, message.source, context)), + Ok(handler) => Some(handler.handle(topic, message.source, context).await), Err(err) => { Some(Err(anyhow::anyhow!( "Failed to decode payload from IPFS message on topic {topic}: {err}" diff --git a/hermes/bin/src/ipfs/mod.rs b/hermes/bin/src/ipfs/mod.rs index 36a67e194..d94879e0e 100644 --- a/hermes/bin/src/ipfs/mod.rs +++ b/hermes/bin/src/ipfs/mod.rs @@ -1,7 +1,20 @@ //! Hermes IPFS service. +//! +//! Why DHT Server Mode is Required: +//! - DHT (Distributed Hash Table) server mode makes this node actively participate in the +//! DHT by storing and serving routing information +//! - This is REQUIRED for Gossipsub `PubSub` to work properly because: +//! 1. `PubSub` uses the DHT to discover which peers are subscribed to topics +//! 2. Gossipsub builds mesh connections based on DHT peer discovery +//! 3. Without server mode, the node would be a "leech" that can't help other peers +//! discover the network, weakening the mesh +//! - All Hermes nodes should be DHT servers to form a robust P2P network + mod api; +pub(crate) mod blocking; mod doc_sync; mod task; +mod topic_handlers; mod topic_message_context; use std::{ @@ -34,8 +47,7 @@ pub(crate) use api::{ hermes_ipfs_add_file, hermes_ipfs_content_validate, hermes_ipfs_dht_get_providers, hermes_ipfs_dht_provide, hermes_ipfs_evict_peer, hermes_ipfs_get_dht_value, hermes_ipfs_get_file, hermes_ipfs_get_peer_identity, hermes_ipfs_pin_file, hermes_ipfs_publish, - hermes_ipfs_put_dht_value, hermes_ipfs_subscribe, hermes_ipfs_unpin_file, - hermes_ipfs_unsubscribe, + hermes_ipfs_put_dht_value, hermes_ipfs_unpin_file, }; use catalyst_types::smt::Tree; use dashmap::DashMap; @@ -47,7 +59,7 @@ use once_cell::sync::OnceCell; pub(crate) use task::SubscriptionKind; use task::{IpfsCommand, ipfs_command_handler}; use tokio::{ - runtime::Builder, + runtime::{Builder, Runtime}, sync::{mpsc, oneshot}, task::JoinHandle, }; @@ -426,10 +438,21 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour + Send + sender: Option>, /// State related to `ApplicationName` apps: AppIpfsState, + /// Tokio runtime kept alive by the Hermes IPFS node + _tokio_runtime: Runtime, /// Phantom data. _phantom_data: PhantomData, } +impl Drop for HermesIpfsNode +where N: hermes_ipfs::rust_ipfs::NetworkBehaviour + Send + Sync +{ + fn drop(&mut self) { + // Close the sender to signal shutdown + self.sender.take(); + } +} + impl HermesIpfsNode where N: hermes_ipfs::rust_ipfs::NetworkBehaviour + Send + Sync { @@ -439,64 +462,60 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour + Send + default_bootstrap: bool, custom_peers: Option>, ) -> anyhow::Result { - let runtime = Builder::new_multi_thread().enable_all().build()?; - let (sender, receiver) = mpsc::channel(1); + let tokio_runtime = Builder::new_multi_thread().enable_all().build()?; - // Build and start IPFS node, before moving into the thread - let node = runtime.block_on(async move { builder.start().await })?; + // Create a channel to send commands to the IPFS node + let (command_sender, command_receiver) = mpsc::channel(1); // Create a oneshot channel to signal when the command handler is ready let (ready_tx, ready_rx) = oneshot::channel(); - let _handle = std::thread::spawn(move || { - let result = runtime.block_on(async move { - // Configure listening address for P2P connections - configure_listening_address(&node).await; - - // Connect to bootstrap peers - if let Some(peers) = custom_peers { - connect_to_bootstrap_peers(&node, peers).await; - } else if default_bootstrap { - // Use public IPFS bootstrap nodes - let addresses = node.default_bootstrap().await?; - node.bootstrap().await?; - tracing::debug!( - "Bootstrapped IPFS node with default addresses: {:?}", - addresses - ); - } + // Start the async block + tokio_runtime.block_on(async { + // Spin-up the IPFS node + let node = builder.start().await?; + configure_listening_address(&node).await; + + // Connect to bootstrap peers + if let Some(peers) = custom_peers { + connect_to_bootstrap_peers(&node, peers).await; + } else if default_bootstrap { + // Use public IPFS bootstrap nodes + let addresses = node.default_bootstrap().await?; + node.bootstrap().await?; + tracing::debug!( + "Bootstrapped IPFS node with default addresses: {:?}", + addresses + ); + } - // Why DHT Server Mode is Required: - // - DHT (Distributed Hash Table) server mode makes this node actively participate - // in the DHT by storing and serving routing information - // - This is REQUIRED for Gossipsub PubSub to work properly because: - // 1. PubSub uses the DHT to discover which peers are subscribed to topics - // 2. Gossipsub builds mesh connections based on DHT peer discovery - // 3. Without server mode, the node would be a "leech" that can't help other peers - // discover the network, weakening the mesh - // - All Hermes nodes should be DHT servers to form a robust P2P network - let hermes_node: HermesIpfs = node.into(); - hermes_node - .dht_mode(hermes_ipfs::rust_ipfs::DhtMode::Server) - .await?; - tracing::debug!("IPFS node set to DHT server mode"); - - // Start command handler - - // Signal that the command handler is about to start - // Ignore the error if the receiver was dropped - let _ = ready_tx.send(()); + // Build the Hermes wrapper around IPFS node + let ipfs: HermesIpfs = node.into(); + ipfs.dht_mode(hermes_ipfs::rust_ipfs::DhtMode::Server) + .await?; + tracing::debug!("IPFS node set to DHT server mode"); - // Start command handler - let h = tokio::spawn(ipfs_command_handler(hermes_node, receiver)); - let (..) = tokio::join!(h); - Ok::<(), anyhow::Error>(()) + // Spawn the command handler task + tokio::spawn(async move { + let _ = ready_tx.send(()); + if let Err(err) = ipfs_command_handler(ipfs, command_receiver).await { + tracing::error!(%err, "IPFS command handler failed"); + + // TODO[rafal-ch]: In the future we should make sure that: + // 1. command handler continues to work when it encounters an error + // that is recoverable (i.e. such errors are logged internally but + // not bubbled up to here) + // 2. command handler returns only in case of fatal error in which + // case we should shutdown the Hermes here. + // + // Currently we just report the error and keep the Hermes running, but + // without IPFS command handler. + } }); - if let Err(e) = result { - tracing::error!("IPFS thread error: {}", e); - } - }); + // All done + Ok::<(), anyhow::Error>(()) + })?; // Wait for the command handler to be ready before returning // This prevents the race condition where auto-subscribe happens before @@ -508,8 +527,9 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour + Send + tracing::debug!("IPFS command handler is ready"); Ok(Self { - sender: Some(sender), + sender: Some(command_sender), apps: AppIpfsState::new(), + _tokio_runtime: tokio_runtime, _phantom_data: PhantomData, }) } @@ -716,11 +736,10 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour + Send + } /// Get the peer identity - // TODO[rafal-ch]: We should not be using API errors here. async fn get_peer_identity( &self, peer: Option, - ) -> Result { + ) -> Result, Errno> { let (cmd_tx, cmd_rx) = oneshot::channel(); self.sender .as_ref() @@ -734,29 +753,30 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour + Send + /// Publish message to a `PubSub` topic async fn pubsub_publish( &self, - topic: PubsubTopic, + topic: &PubsubTopic, message: MessageData, ) -> Result<(), Errno> { let (cmd_tx, cmd_rx) = oneshot::channel(); self.sender .as_ref() .ok_or(Errno::PubsubPublishError)? - .send(IpfsCommand::Publish(topic, message, cmd_tx)) + .send(IpfsCommand::Publish(topic.clone(), message, cmd_tx)) .await .map_err(|_| Errno::PubsubPublishError)?; cmd_rx.await.map_err(|_| Errno::PubsubPublishError)? } - /// Subscribe to a `PubSub` topic + /// Subscribe to a `PubSub` topic. async fn pubsub_subscribe( &self, kind: SubscriptionKind, topic: &PubsubTopic, tree: Option>>>, app_name: &ApplicationName, - module_ids: Option>, + module_ids: Option<&Vec>, ) -> Result, Errno> { let (cmd_tx, cmd_rx) = oneshot::channel(); + let module_ids_owned = module_ids.cloned(); self.sender .as_ref() .ok_or(Errno::PubsubSubscribeError)? @@ -765,7 +785,7 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour + Send + kind, tree, app_name.clone(), - module_ids, + module_ids_owned, cmd_tx, )) .await @@ -1017,3 +1037,37 @@ fn is_valid_pubsub_content( // TODO(anyone): https://github.com/input-output-hk/hermes/issues/288 !message.is_empty() } + +/// Helper macro to keep the same `ipfs.apps` update logic for both sync +/// and async subscription calls. +#[macro_export] +macro_rules! subscribe_to_topic { + ($ipfs:expr, $kind:expr, $app_name:expr, $topic:expr, $subscribe_call:expr) => {{ + if $ipfs.apps.topic_subscriptions_contains($kind, $topic) { + tracing::debug!(app_name = %$app_name, pubsub_topic = %$topic, "topic subscription stream already exists"); + } else { + let handle = $subscribe_call?; + $ipfs.apps.added_topic_stream($kind, $topic.clone(), handle); + tracing::debug!(app_name = %$app_name, pubsub_topic = %$topic, "added subscription topic stream"); + } + $ipfs.apps + .added_app_topic_subscription($kind, $app_name.clone(), $topic.clone()); + }}; +} + +/// Helper macro to keep the same `ipfs.apps` update logic for both sync +/// and async unsubscription calls. +#[macro_export] +macro_rules! unsubscribe_from_topic { + ($ipfs:expr, $kind:expr, $app_name:expr, $topic:expr, $unsubscribe_call:expr) => {{ + if $ipfs.apps.topic_subscriptions_contains($kind, $topic) { + $unsubscribe_call?; + $ipfs.apps.removed_topic_stream($kind, $topic); + tracing::debug!(app_name = %$app_name, pubsub_topic = %$topic, "removed subscription topic stream"); + } else { + tracing::debug!(app_name = %$app_name, pubsub_topic = %$topic, "topic subscription does not exist"); + } + $ipfs.apps + .removed_app_topic_subscription($kind, $app_name, $topic); + }}; +} diff --git a/hermes/bin/src/ipfs/task.rs b/hermes/bin/src/ipfs/task.rs index 4b60f6bb8..717ba0c3e 100644 --- a/hermes/bin/src/ipfs/task.rs +++ b/hermes/bin/src/ipfs/task.rs @@ -7,10 +7,7 @@ use std::{ }; use catalyst_types::smt::Tree; -use hermes_ipfs::{ - Cid, HermesIpfs, IpfsPath as PathIpfsFile, PeerId as TargetPeerId, - doc_sync::payload::{self}, -}; +use hermes_ipfs::{Cid, HermesIpfs, IpfsPath as PathIpfsFile, PeerId as TargetPeerId}; use tokio::{ sync::{mpsc, oneshot}, task::JoinHandle, @@ -20,19 +17,22 @@ use tokio::{ use super::HERMES_IPFS; use crate::{ app::ApplicationName, - ipfs::{doc_sync::handle_doc_sync_topic, topic_message_context::TopicMessageContext}, + ipfs::{ + topic_handlers::{self, TopicMessageHandler, TopicSubscriptionStatusHandler}, + topic_message_context::TopicMessageContext, + }, runtime_extensions::{ bindings::hermes::ipfs::api::{ - DhtKey, DhtValue, Errno, IpfsFile, MessageData, PeerId, PubsubMessage, PubsubTopic, - }, - hermes::{ - doc_sync::{self, OnNewDocEvent}, - ipfs::event::OnTopicEvent, + DhtKey, DhtValue, Errno, IpfsFile, MessageData, PeerId, PubsubTopic, }, + hermes::doc_sync::{self, OnNewDocEvent}, }, wasm::module::ModuleId, }; +/// Timeout for IPFS identity call +const IDENTITY_CALL_TIMEOUT: Duration = Duration::from_millis(300); + /// Chooses how subscription messages are handled. #[derive(Copy, Clone, Debug, Default)] pub(crate) enum SubscriptionKind { @@ -88,18 +88,17 @@ pub(crate) enum IpfsCommand { /// Gets the peer identity Identity( Option, - oneshot::Sender>, + oneshot::Sender, Errno>>, ), } /// Handle IPFS commands in asynchronous task. #[allow(clippy::too_many_lines)] pub(crate) async fn ipfs_command_handler( - hermes_node: HermesIpfs, + ipfs: HermesIpfs, mut queue_rx: mpsc::Receiver, ) -> anyhow::Result<()> { - // Wrap in Arc to allow sharing across spawned tasks - let hermes_node = Arc::new(hermes_node); + let ipfs = Arc::new(ipfs); while let Some(ipfs_command) = queue_rx.recv().await { tracing::debug!( @@ -108,14 +107,14 @@ pub(crate) async fn ipfs_command_handler( ); match ipfs_command { IpfsCommand::AddFile(ipfs_file, tx) => { - let response = hermes_node + let response = ipfs .add_ipfs_file(ipfs_file) .await .map_err(|_| Errno::FileAddError); send_response(response, tx); }, IpfsCommand::GetFile(cid, tx) => { - let response = hermes_node + let response = ipfs .get_ipfs_file_cbor(&cid) .await .map_err(|_| Errno::FileGetError); @@ -124,7 +123,7 @@ pub(crate) async fn ipfs_command_handler( IpfsCommand::GetFileWithProviders(cid, providers, tx) => { // Spawn task to avoid blocking the command handler // This allows concurrent file fetches and retry logic to work - let node = Arc::clone(&hermes_node); + let node = Arc::clone(&ipfs); tokio::spawn(async move { let response = node .get_ipfs_file_cbor_with_providers(&cid, &providers) @@ -134,7 +133,7 @@ pub(crate) async fn ipfs_command_handler( }); }, IpfsCommand::PinFile(cid, tx) => { - let response = match hermes_node.insert_pin(&cid).await { + let response = match ipfs.insert_pin(&cid).await { Ok(()) => { tracing::info!("Pin succeeded for CID: {}", cid.to_string()); Ok(true) @@ -152,7 +151,7 @@ pub(crate) async fn ipfs_command_handler( send_response(response, tx); }, IpfsCommand::UnPinFile(cid, tx) => { - let response = match hermes_node.remove_pin(&cid).await { + let response = match ipfs.remove_pin(&cid).await { Ok(()) => Ok(true), Err(err) => { tracing::error!(cid = %cid, "failed to un-pin: {}", err); @@ -162,29 +161,26 @@ pub(crate) async fn ipfs_command_handler( send_response(response, tx); }, IpfsCommand::GetDhtValue(key, tx) => { - let response = hermes_node.dht_get(key.clone()).await.map_err(|err| { + let response = ipfs.dht_get(key.clone()).await.map_err(|err| { tracing::error!(dht_key = ?key, "failed to get DHT value: {}", err); Errno::DhtGetError }); send_response(response, tx); }, IpfsCommand::PutDhtValue(key, value, tx) => { - let response = hermes_node.dht_put(key, value).await.is_ok(); + let response = ipfs.dht_put(key, value).await.is_ok(); send_response(Ok(response), tx); }, IpfsCommand::Publish(topic, message, tx) => { - let result = hermes_node - .pubsub_publish(&topic, message) - .await - .map_err(|e| { - tracing::error!(topic = %topic, "pubsub_publish failed: {}", e); - Errno::PubsubPublishError - }); + let result = ipfs.pubsub_publish(&topic, message).await.map_err(|e| { + tracing::error!(topic = %topic, "pubsub_publish failed: {}", e); + Errno::PubsubPublishError + }); send_response(result, tx); }, IpfsCommand::Subscribe(topic, kind, tree, app_name, module_ids, tx) => { tracing::info!(topic, "received Subscribe request"); - let stream = hermes_node + let stream = ipfs .pubsub_subscribe(&topic) .await .map_err(|_| Errno::PubsubSubscribeError)?; @@ -193,14 +189,14 @@ pub(crate) async fn ipfs_command_handler( SubscriptionKind::Default => { TopicMessageHandler::new( &topic, - topic_message_handler, + topic_handlers::topic_message_handler, TopicMessageContext::new(None, app_name, module_ids), ) }, SubscriptionKind::DocSync => { TopicMessageHandler::new( &topic, - doc_sync_topic_message_handler, + topic_handlers::doc_sync_topic_message_handler, TopicMessageContext::new(tree, app_name, module_ids), ) }, @@ -211,42 +207,44 @@ pub(crate) async fn ipfs_command_handler( let handle = hermes_ipfs::subscription_stream_task( stream, move |msg| { - message_handler.handle(msg); + let message_handler_owned = message_handler.clone(); + async move { + message_handler_owned.handle(msg).await; + } }, move |msg| { - subscription_handler.handle(msg); + let subscription_handler_owned = subscription_handler.clone(); + async move { + subscription_handler_owned.handle(msg); + } }, ); send_response(Ok(handle), tx); }, IpfsCommand::Unsubscribe(topic, tx) => { tracing::info!(topic, "received Unsubscribe request"); - hermes_node - .pubsub_unsubscribe(topic) + ipfs.pubsub_unsubscribe(topic) .await .map_err(|_| Errno::PubsubUnsubscribeError)?; send_response(Ok(()), tx); }, IpfsCommand::EvictPeer(peer, tx) => { let peer_id = TargetPeerId::from_str(&peer).map_err(|_| Errno::InvalidPeerId)?; - let status = hermes_node.ban_peer(peer_id).await.is_ok(); + let status = ipfs.ban_peer(peer_id).await.is_ok(); send_response(Ok(status), tx); }, IpfsCommand::DhtProvide(key, tx) => { - let response = hermes_node.dht_provide(key.clone()).await.map_err(|err| { + let response = ipfs.dht_provide(key.clone()).await.map_err(|err| { tracing::error!(dht_key = ?key, "DHT provide failed: {}", err); Errno::DhtProvideError }); send_response(response, tx); }, IpfsCommand::DhtGetProviders(key, tx) => { - let response = hermes_node - .dht_get_providers(key.clone()) - .await - .map_err(|err| { - tracing::error!(dht_key = ?key, "DHT get providers failed: {}", err); - Errno::DhtGetProvidersError - }); + let response = ipfs.dht_get_providers(key.clone()).await.map_err(|err| { + tracing::error!(dht_key = ?key, "DHT get providers failed: {}", err); + Errno::DhtGetProvidersError + }); send_response(response, tx); }, IpfsCommand::Identity(peer_id, tx) => { @@ -260,17 +258,30 @@ pub(crate) async fn ipfs_command_handler( None => None, }; - let response = hermes_node.identity(peer_id).await.map_err(|err| { - tracing::error!(peer_id = ?peer_id, "Identity failed: {}", err); - Errno::GetPeerIdError - }); + // TODO[rafal-ch]: Timeout here is a workaround: https://github.com/input-output-hk/hermes/issues/798 + let response = + match tokio::time::timeout(IDENTITY_CALL_TIMEOUT, ipfs.identity(peer_id)).await + { + Ok(Ok(identity)) => { + tracing::info!("got identity"); + Ok(Some(identity)) + }, + Ok(Err(err)) => { + tracing::warn!(peer_id = ?peer_id, %err, "identity call failed"); + Ok(None) + }, + Err(_) => { + tracing::warn!(peer_id = ?peer_id, "identity call timeout"); + Ok(None) + }, + }; send_response(response, tx); }, } } // Try to stop the node - only works if this is the last reference - if let Ok(node) = Arc::try_unwrap(hermes_node) { + if let Ok(node) = Arc::try_unwrap(ipfs) { node.stop().await; } else { tracing::warn!("Could not stop IPFS node - other references still exist"); @@ -278,135 +289,6 @@ pub(crate) async fn ipfs_command_handler( Ok(()) } -/// A handler for messages from the IPFS pubsub topic -pub(super) struct TopicMessageHandler { - /// The topic. - topic: String, - - /// The handler implementation. - #[allow( - clippy::type_complexity, - reason = "to be revisited after the doc sync functionality is fully implemented as this type still evolves" - )] - callback: Box< - dyn Fn(hermes_ipfs::rust_ipfs::GossipsubMessage, String, &TopicMessageContext) - + Send - + Sync - + 'static, - >, - - /// The context. - context: TopicMessageContext, -} - -impl TopicMessageHandler { - /// Creates the new handler. - pub fn new( - topic: &impl ToString, - callback: F, - context: TopicMessageContext, - ) -> Self - where - F: Fn(hermes_ipfs::rust_ipfs::GossipsubMessage, String, &TopicMessageContext) - + Send - + Sync - + 'static, - { - Self { - topic: topic.to_string(), - callback: Box::new(callback), - context, - } - } - - /// Forwards the message to the handler. - pub fn handle( - &self, - msg: hermes_ipfs::rust_ipfs::GossipsubMessage, - ) { - (self.callback)(msg, self.topic.clone(), &self.context); - } -} - -/// A handler for subscribe/unsubscribe events from the IPFS pubsub topic -pub(super) struct TopicSubscriptionStatusHandler -where T: Fn(hermes_ipfs::SubscriptionStatusEvent, String) + Send + Sync + 'static -{ - /// The topic. - topic: String, - - /// The handler implementation. - callback: T, -} - -impl TopicSubscriptionStatusHandler -where T: Fn(hermes_ipfs::SubscriptionStatusEvent, String) + Send + Sync + 'static -{ - /// Creates the new handler. - pub fn new( - topic: &impl ToString, - callback: T, - ) -> Self { - Self { - topic: topic.to_string(), - callback, - } - } - - /// Passes the subscription event to the handler. - pub fn handle( - &self, - subscription_event: hermes_ipfs::SubscriptionStatusEvent, - ) { - (self.callback)(subscription_event, self.topic.clone()); - } -} - -/// Handler function for topic message streams. -fn topic_message_handler( - message: hermes_ipfs::rust_ipfs::GossipsubMessage, - topic: String, - context: &TopicMessageContext, -) { - if let Some(ipfs) = HERMES_IPFS.get() { - let app_names = ipfs.apps.subscribed_apps(SubscriptionKind::Default, &topic); - - drop( - OnTopicEvent::new(PubsubMessage { - topic, - message: message.data.into(), - publisher: message.source.map(|p| p.to_string()), - }) - .build_and_send(app_names, context.module_ids()), - ); - } else { - tracing::error!("Failed to send on_topic_event. IPFS is uninitialized"); - } -} - -/// Handler for Doc Sync `PubSub` messages. -#[allow( - clippy::needless_pass_by_value, - reason = "the other handler consumes the message and we need to keep the signatures consistent" -)] -fn doc_sync_topic_message_handler( - message: hermes_ipfs::rust_ipfs::GossipsubMessage, - topic: String, - context: &TopicMessageContext, -) { - if let Ok(msg_str) = std::str::from_utf8(&message.data) { - tracing::info!( - "RECEIVED PubSub message on topic: {topic} - data: {}", - &msg_str.chars().take(100).collect::() - ); - } - - let result = handle_doc_sync_topic::(&message, &topic, context); - if let Some(Err(err)) = result { - tracing::error!("Failed to handle IPFS message: {}", err); - } -} - /// Processes the received CIDs from a broadcasted message. pub(super) fn process_broadcasted_cids( topic: &str, diff --git a/hermes/bin/src/ipfs/topic_handlers.rs b/hermes/bin/src/ipfs/topic_handlers.rs new file mode 100644 index 000000000..31873d7e4 --- /dev/null +++ b/hermes/bin/src/ipfs/topic_handlers.rs @@ -0,0 +1,155 @@ +//! Hermes IPFS service. +//! +//! Handlers for IPFS topics. + +use std::{pin::Pin, sync::Arc}; + +use hermes_ipfs::doc_sync::payload::{self}; + +use super::HERMES_IPFS; +use crate::{ + ipfs::{ + SubscriptionKind, doc_sync::handle_doc_sync_topic, + topic_message_context::TopicMessageContext, + }, + runtime_extensions::{ + bindings::hermes::ipfs::api::PubsubMessage, hermes::ipfs::event::OnTopicEvent, + }, +}; + +/// A handler for messages from the IPFS pubsub topic +#[derive(Clone)] +pub(super) struct TopicMessageHandler { + /// The topic. + topic: String, + + /// The handler implementation. + #[allow( + clippy::type_complexity, + reason = "to be revisited after the doc sync functionality is fully implemented as this type still evolves" + )] + callback: Arc< + dyn Fn( + hermes_ipfs::rust_ipfs::GossipsubMessage, + String, + TopicMessageContext, /* TODO[rafal-ch]: Should become a borrow, but if not + * possible, at least an Arc */ + ) -> Pin + Send>> + + Send + + Sync + + 'static, + >, + + /// The context. + context: TopicMessageContext, +} + +impl TopicMessageHandler { + /// Creates the new handler. + pub fn new( + topic: &str, + handler: F, + context: TopicMessageContext, + ) -> Self + where + F: Fn(hermes_ipfs::rust_ipfs::GossipsubMessage, String, TopicMessageContext) -> Fut + + Send + + Sync + + 'static, + Fut: Future + Send + 'static, + { + Self { + topic: topic.to_string(), + callback: Arc::new(move |msg, topic, ctx| Box::pin(handler(msg, topic, ctx))), + context, + } + } + + /// Forwards the message to the handler. + pub async fn handle( + &self, + msg: hermes_ipfs::rust_ipfs::GossipsubMessage, + ) { + (self.callback)(msg, self.topic.clone(), self.context.clone()).await; + } +} + +/// A handler for subscribe/unsubscribe events from the IPFS pubsub topic +#[derive(Clone)] +pub(super) struct TopicSubscriptionStatusHandler +where T: Fn(hermes_ipfs::SubscriptionStatusEvent, String) + Send + Sync + 'static +{ + /// The topic. + topic: String, + + /// The handler implementation. + callback: T, +} + +impl TopicSubscriptionStatusHandler +where T: Fn(hermes_ipfs::SubscriptionStatusEvent, String) + Send + Sync + 'static +{ + /// Creates the new handler. + pub fn new( + topic: &impl ToString, + callback: T, + ) -> Self { + Self { + topic: topic.to_string(), + callback, + } + } + + /// Passes the subscription event to the handler. + pub fn handle( + &self, + subscription_event: hermes_ipfs::SubscriptionStatusEvent, + ) { + (self.callback)(subscription_event, self.topic.clone()); + } +} + +/// Handler function for topic message streams. +pub(super) async fn topic_message_handler( + message: hermes_ipfs::rust_ipfs::GossipsubMessage, + topic: String, + context: TopicMessageContext, +) { + if let Some(ipfs) = HERMES_IPFS.get() { + let app_names = ipfs.apps.subscribed_apps(SubscriptionKind::Default, &topic); + + drop( + OnTopicEvent::new(PubsubMessage { + topic, + message: message.data.into(), + publisher: message.source.map(|p| p.to_string()), + }) + .build_and_send(app_names, context.module_ids()), + ); + } else { + tracing::error!("Failed to send on_topic_event. IPFS is uninitialized"); + } +} + +/// Handler for Doc Sync `PubSub` messages. +#[allow( + clippy::needless_pass_by_value, + reason = "the other handler consumes the message and we need to keep the signatures consistent" +)] +pub(super) async fn doc_sync_topic_message_handler( + message: hermes_ipfs::rust_ipfs::GossipsubMessage, + topic: String, + context: TopicMessageContext, +) { + if let Ok(msg_str) = std::str::from_utf8(&message.data) { + tracing::info!( + "RECEIVED PubSub message on topic: {topic} - data: {}", + &msg_str.chars().take(100).collect::() + ); + } + + let result = handle_doc_sync_topic::(&message, &topic, context).await; + if let Some(Err(err)) = result { + tracing::error!("Failed to handle IPFS message: {}", err); + } +} diff --git a/hermes/bin/src/runtime_extensions/hermes/doc_sync/host.rs b/hermes/bin/src/runtime_extensions/hermes/doc_sync/host.rs index f18e10b1d..e921c0e99 100644 --- a/hermes/bin/src/runtime_extensions/hermes/doc_sync/host.rs +++ b/hermes/bin/src/runtime_extensions/hermes/doc_sync/host.rs @@ -14,7 +14,7 @@ use wasmtime::component::Resource; use super::ChannelState; use crate::{ app::ApplicationName, - ipfs::{self, SubscriptionKind, hermes_ipfs_publish, hermes_ipfs_subscribe}, + ipfs::{self, SubscriptionKind, blocking}, runtime_context::HermesRuntimeContext, runtime_extensions::{ bindings::hermes::{ @@ -131,7 +131,7 @@ impl HostSyncChannel for HermesRuntimeContext { let tree = Arc::clone(&channel_state.smt); let topic = format!("{name}{topic_suffix}"); // When the channel is created, subscribe to two topics: .new and .syn - if let Err(err) = hermes_ipfs_subscribe( + if let Err(err) = blocking::hermes_ipfs_subscribe( ipfs::SubscriptionKind::DocSync, self.app_name(), Some(tree), @@ -476,7 +476,7 @@ fn publish_new_payload( // is performed in `new()`). Invoking the subscription again to ensure // the topic is active, because Gossipsub enforces that peers must subscribe // to a topic before they are permitted to publish on it. - match hermes_ipfs_subscribe( + match blocking::hermes_ipfs_subscribe( SubscriptionKind::DocSync, ctx.app_name(), None, @@ -494,7 +494,7 @@ fn publish_new_payload( topic_new ); - match hermes_ipfs_publish(ctx.app_name(), &topic_new, payload_bytes) { + match blocking::hermes_ipfs_publish(ctx.app_name(), &topic_new, payload_bytes) { Ok(()) => { tracing::info!("✅ Step {STEP}/{POST_STEP_COUNT}: Published to PubSub → {topic_new}"); if let Some(timers) = channel_state.timers.as_ref() { @@ -565,7 +565,7 @@ fn send_new_keepalive( .map_err(|e| anyhow::anyhow!("Failed to encode payload::New: {e}"))?; let new_topic = format!("{channel_name}.new"); - hermes_ipfs_publish(app_name, &new_topic, payload_bytes) + blocking::hermes_ipfs_publish(app_name, &new_topic, payload_bytes) .map_err(|e| anyhow::Error::msg(format!("Keepalive publish failed: {e:?}")))?; Ok(()) } diff --git a/hermes/bin/src/runtime_extensions/hermes/ipfs/host.rs b/hermes/bin/src/runtime_extensions/hermes/ipfs/host.rs index a80d4e6b1..60f17df7d 100644 --- a/hermes/bin/src/runtime_extensions/hermes/ipfs/host.rs +++ b/hermes/bin/src/runtime_extensions/hermes/ipfs/host.rs @@ -4,11 +4,10 @@ use hermes_ipfs::Cid; use crate::{ ipfs::{ - self, hermes_ipfs_add_file, hermes_ipfs_content_validate, hermes_ipfs_dht_get_providers, - hermes_ipfs_dht_provide, hermes_ipfs_evict_peer, hermes_ipfs_get_dht_value, - hermes_ipfs_get_file, hermes_ipfs_get_peer_identity, hermes_ipfs_pin_file, - hermes_ipfs_publish, hermes_ipfs_put_dht_value, hermes_ipfs_subscribe, - hermes_ipfs_unpin_file, hermes_ipfs_unsubscribe, + self, blocking, hermes_ipfs_add_file, hermes_ipfs_content_validate, + hermes_ipfs_dht_get_providers, hermes_ipfs_dht_provide, hermes_ipfs_evict_peer, + hermes_ipfs_get_dht_value, hermes_ipfs_get_file, hermes_ipfs_pin_file, + hermes_ipfs_put_dht_value, hermes_ipfs_unpin_file, }, runtime_context::HermesRuntimeContext, runtime_extensions::bindings::hermes::ipfs::api::{ @@ -87,9 +86,11 @@ impl Host for HermesRuntimeContext { } fn get_peer_id(&mut self) -> wasmtime::Result> { - let identity = hermes_ipfs_get_peer_identity(self.app_name(), None)?; - let peer_id = identity.peer_id; - Ok(Ok(peer_id.to_string())) + let maybe_identity = blocking::hermes_ipfs_get_peer_identity(None)?; + match maybe_identity { + Some(identity) => Ok(Ok(identity.peer_id.to_string())), + None => Ok(Err(Errno::GetPeerIdError)), + } } fn pubsub_publish( @@ -97,14 +98,18 @@ impl Host for HermesRuntimeContext { topic: PubsubTopic, message: MessageData, ) -> wasmtime::Result> { - Ok(hermes_ipfs_publish(self.app_name(), &topic, message)) + Ok(blocking::hermes_ipfs_publish( + self.app_name(), + &topic, + message, + )) } fn pubsub_subscribe( &mut self, topic: PubsubTopic, ) -> wasmtime::Result> { - Ok(hermes_ipfs_subscribe( + Ok(blocking::hermes_ipfs_subscribe( ipfs::SubscriptionKind::Default, self.app_name(), None, @@ -117,7 +122,7 @@ impl Host for HermesRuntimeContext { &mut self, topic: PubsubTopic, ) -> wasmtime::Result> { - Ok(hermes_ipfs_unsubscribe( + Ok(blocking::hermes_ipfs_unsubscribe( ipfs::SubscriptionKind::Default, self.app_name(), &topic,