Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .config/dictionaries/project.dic
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ txos
unfinalized
unlinkat
unsub
unsubscription
usermod
usvg
utimensat
Expand All @@ -240,4 +241,4 @@ xprivate
xprv
xpub
zilla
zillable
zillable
2 changes: 1 addition & 1 deletion hermes/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ path = "tests/integration/tests/mod.rs"

[dependencies]
# Catalyst Internal Crates
hermes-ipfs = { version = "0.0.12", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "hermes-ipfs/v0.0.12", features = ["doc-sync"] }
hermes-ipfs = { version = "0.0.13", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "hermes-ipfs/v0.0.13", features = ["doc-sync"] }
cardano-blockchain-types = { version = "0.0.9", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-blockchain-types/v0.0.9" }
cardano-chain-follower = { version = "0.0.19", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-chain-follower/v0.0.19" }
catalyst-types = { version = "0.0.12", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.12" }
Expand Down
129 changes: 28 additions & 101 deletions hermes/bin/src/ipfs/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
},
hermes::doc_sync,
},
subscribe_to_topic, unsubscribe_from_topic,
wasm::module::ModuleId,
};

Expand Down Expand Up @@ -144,38 +145,19 @@ pub(crate) fn hermes_ipfs_dht_get_providers(
}

/// Returns the peer id of the node.
pub(crate) fn hermes_ipfs_get_peer_identity(
app_name: &ApplicationName,
peer: Option<PeerId>,
) -> Result<hermes_ipfs::PeerInfo, Errno> {
pub(crate) async fn hermes_ipfs_get_peer_identity(
peer: Option<PeerId>
) -> Result<Option<hermes_ipfs::PeerInfo>, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;

let res = if tokio::runtime::Handle::try_current().is_ok() {
tracing::debug!("identity with existing Tokio runtime");

let (tx, rx) = std::sync::mpsc::channel();

tokio::task::spawn_blocking(move || {
let handle = tokio::runtime::Handle::current();
let res = handle.block_on(ipfs.get_peer_identity(peer));
drop(tx.send(res));
});

rx.recv().map_err(|_| Errno::PubsubPublishError)
} else {
tracing::debug!("identity without existing Tokio runtime");
let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?;
let identity = ipfs.get_peer_identity(peer).await?;
tracing::debug!("Got peer identity");

Ok(rt.block_on(ipfs.get_peer_identity(peer)))
}??;

tracing::debug!(app_name = %app_name, "Got peer identity");

Ok(res)
Ok(identity)
}

/// Subscribe to a topic
pub(crate) fn hermes_ipfs_subscribe(
/// Subscribe to a topic.
pub(crate) async fn hermes_ipfs_subscribe(
kind: SubscriptionKind,
app_name: &ApplicationName,
tree: Option<Arc<Mutex<Tree<doc_sync::Cid>>>>,
Expand All @@ -184,77 +166,41 @@ pub(crate) fn hermes_ipfs_subscribe(
) -> Result<bool, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "subscribing to PubSub topic");
let module_ids_owned = module_ids.cloned();
if ipfs.apps.topic_subscriptions_contains(kind, topic) {
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "topic subscription stream already exists");
} else {
let topic_owned = topic.clone();
let app_name_owned = app_name.clone();
let handle = if let Ok(rt) = tokio::runtime::Handle::try_current() {
tracing::debug!("subscribe with existing Tokio runtime");
let (tx, rx) = std::sync::mpsc::channel();
tokio::task::spawn_blocking(move || {
let res = rt.block_on(ipfs.pubsub_subscribe(
kind,
&topic_owned,
tree,
&app_name_owned,
module_ids_owned,
));
drop(tx.send(res));
});
rx.recv().map_err(|_| Errno::PubsubSubscribeError)??
} else {
tracing::debug!("subscribe without existing Tokio runtime");
let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?;
rt.block_on(ipfs.pubsub_subscribe(kind, topic, tree, app_name, module_ids_owned))?
};

ipfs.apps.added_topic_stream(kind, topic.clone(), handle);
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "added subscription topic stream");
}
ipfs.apps
.added_app_topic_subscription(kind, app_name.clone(), topic.clone());
subscribe_to_topic!(
ipfs,
kind,
app_name,
topic,
ipfs.pubsub_subscribe(kind, topic, tree, app_name, module_ids)
.await
);

Ok(true)
}

/// Unsubscribe from a topic
pub(crate) fn hermes_ipfs_unsubscribe(
pub(crate) async fn hermes_ipfs_unsubscribe(
kind: SubscriptionKind,
app_name: &ApplicationName,
topic: &PubsubTopic,
) -> Result<bool, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "unsubscribing from PubSub topic");

if ipfs.apps.topic_subscriptions_contains(kind, topic) {
let topic_owned = topic.clone();
if let Ok(rt) = tokio::runtime::Handle::try_current() {
tracing::debug!("unsubscribe with existing Tokio runtime");
let (tx, rx) = std::sync::mpsc::channel();
tokio::task::spawn_blocking(move || {
let res = rt.block_on(ipfs.pubsub_unsubscribe(&topic_owned));
let _ = tx.send(res);
});
rx.recv().map_err(|_| Errno::PubsubUnsubscribeError)??;
} else {
tracing::debug!("unsubscribe without existing Tokio runtime");
let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?;
rt.block_on(ipfs.pubsub_unsubscribe(topic))?;
}
unsubscribe_from_topic!(
ipfs,
kind,
app_name,
topic,
ipfs.pubsub_unsubscribe(topic).await
);

ipfs.apps.removed_topic_stream(kind, topic);
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "removed subscription topic stream");
} else {
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "topic subscription does not exist");
}
ipfs.apps
.removed_app_topic_subscription(kind, app_name, topic);
Ok(true)
}

/// Publish message to a topic
pub(crate) fn hermes_ipfs_publish(
pub(crate) async fn hermes_ipfs_publish(
app_name: &ApplicationName,
topic: &PubsubTopic,
message: MessageData,
Expand All @@ -269,26 +215,7 @@ pub(crate) fn hermes_ipfs_publish(
"📤 Publishing PubSub message"
);

let res = if tokio::runtime::Handle::try_current().is_ok() {
tracing::debug!("publish with existing Tokio runtime");

let (tx, rx) = std::sync::mpsc::channel();
let topic_owned = topic.clone();

tokio::task::spawn_blocking(move || {
let handle = tokio::runtime::Handle::current();
let res = handle.block_on(ipfs.pubsub_publish(topic_owned, message));
let _ = tx.send(res);
});

rx.recv().map_err(|_| Errno::PubsubPublishError)
} else {
tracing::debug!("publish without existing Tokio runtime");

let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?;

Ok(rt.block_on(ipfs.pubsub_publish(topic.to_string(), message)))
}?;
let res = ipfs.pubsub_publish(topic, message).await;

match &res {
Ok(()) => {
Expand Down
Loading