Skip to content

Commit 28706c1

Browse files
authored
chore(hermes): Rework async in HermesIpfs (#797)
* Get rid of the superfluous OS thread and `block_on()` call * Minor renames for clarity * Support async topic message handlers in IPFS * Async fixes to IPFS identity calls * Async cleanup for `pubsub_publish()` * Async cleanup for `pubsub_unsubscribe()` * Extract `_blocking` function flavors to a dedicated file * Extract even more `_blocking` function flavors to a dedicated file * Unify handling of `ipfs.apps` in (un)subscription calls * Fix comment * Fix comment * Remove the unneeded passing of Tokio runtime * Extract handlers to `topic_handlers` module * Bump `hermes_ipfs` version * Fix lints * Update project dictionary
1 parent 26bd89c commit 28706c1

File tree

11 files changed

+611
-386
lines changed

11 files changed

+611
-386
lines changed

.config/dictionaries/project.dic

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ txos
219219
unfinalized
220220
unlinkat
221221
unsub
222+
unsubscription
222223
usermod
223224
usvg
224225
utimensat
@@ -240,4 +241,4 @@ xprivate
240241
xprv
241242
xpub
242243
zilla
243-
zillable
244+
zillable

hermes/bin/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ path = "tests/integration/tests/mod.rs"
3131

3232
[dependencies]
3333
# Catalyst Internal Crates
34-
hermes-ipfs = { version = "0.0.12", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "hermes-ipfs/v0.0.12", features = ["doc-sync"] }
34+
hermes-ipfs = { version = "0.0.13", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "hermes-ipfs/v0.0.13", features = ["doc-sync"] }
3535
cardano-blockchain-types = { version = "0.0.9", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-blockchain-types/v0.0.9" }
3636
cardano-chain-follower = { version = "0.0.19", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-chain-follower/v0.0.19" }
3737
catalyst-types = { version = "0.0.12", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.12" }

hermes/bin/src/ipfs/api.rs

Lines changed: 28 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
},
1414
hermes::doc_sync,
1515
},
16+
subscribe_to_topic, unsubscribe_from_topic,
1617
wasm::module::ModuleId,
1718
};
1819

@@ -144,38 +145,19 @@ pub(crate) fn hermes_ipfs_dht_get_providers(
144145
}
145146

146147
/// Returns the peer id of the node.
147-
pub(crate) fn hermes_ipfs_get_peer_identity(
148-
app_name: &ApplicationName,
149-
peer: Option<PeerId>,
150-
) -> Result<hermes_ipfs::PeerInfo, Errno> {
148+
pub(crate) async fn hermes_ipfs_get_peer_identity(
149+
peer: Option<PeerId>
150+
) -> Result<Option<hermes_ipfs::PeerInfo>, Errno> {
151151
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
152152

153-
let res = if tokio::runtime::Handle::try_current().is_ok() {
154-
tracing::debug!("identity with existing Tokio runtime");
155-
156-
let (tx, rx) = std::sync::mpsc::channel();
157-
158-
tokio::task::spawn_blocking(move || {
159-
let handle = tokio::runtime::Handle::current();
160-
let res = handle.block_on(ipfs.get_peer_identity(peer));
161-
drop(tx.send(res));
162-
});
163-
164-
rx.recv().map_err(|_| Errno::PubsubPublishError)
165-
} else {
166-
tracing::debug!("identity without existing Tokio runtime");
167-
let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?;
153+
let identity = ipfs.get_peer_identity(peer).await?;
154+
tracing::debug!("Got peer identity");
168155

169-
Ok(rt.block_on(ipfs.get_peer_identity(peer)))
170-
}??;
171-
172-
tracing::debug!(app_name = %app_name, "Got peer identity");
173-
174-
Ok(res)
156+
Ok(identity)
175157
}
176158

177-
/// Subscribe to a topic
178-
pub(crate) fn hermes_ipfs_subscribe(
159+
/// Subscribe to a topic.
160+
pub(crate) async fn hermes_ipfs_subscribe(
179161
kind: SubscriptionKind,
180162
app_name: &ApplicationName,
181163
tree: Option<Arc<Mutex<Tree<doc_sync::Cid>>>>,
@@ -184,77 +166,41 @@ pub(crate) fn hermes_ipfs_subscribe(
184166
) -> Result<bool, Errno> {
185167
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
186168
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "subscribing to PubSub topic");
187-
let module_ids_owned = module_ids.cloned();
188-
if ipfs.apps.topic_subscriptions_contains(kind, topic) {
189-
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "topic subscription stream already exists");
190-
} else {
191-
let topic_owned = topic.clone();
192-
let app_name_owned = app_name.clone();
193-
let handle = if let Ok(rt) = tokio::runtime::Handle::try_current() {
194-
tracing::debug!("subscribe with existing Tokio runtime");
195-
let (tx, rx) = std::sync::mpsc::channel();
196-
tokio::task::spawn_blocking(move || {
197-
let res = rt.block_on(ipfs.pubsub_subscribe(
198-
kind,
199-
&topic_owned,
200-
tree,
201-
&app_name_owned,
202-
module_ids_owned,
203-
));
204-
drop(tx.send(res));
205-
});
206-
rx.recv().map_err(|_| Errno::PubsubSubscribeError)??
207-
} else {
208-
tracing::debug!("subscribe without existing Tokio runtime");
209-
let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?;
210-
rt.block_on(ipfs.pubsub_subscribe(kind, topic, tree, app_name, module_ids_owned))?
211-
};
212169

213-
ipfs.apps.added_topic_stream(kind, topic.clone(), handle);
214-
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "added subscription topic stream");
215-
}
216-
ipfs.apps
217-
.added_app_topic_subscription(kind, app_name.clone(), topic.clone());
170+
subscribe_to_topic!(
171+
ipfs,
172+
kind,
173+
app_name,
174+
topic,
175+
ipfs.pubsub_subscribe(kind, topic, tree, app_name, module_ids)
176+
.await
177+
);
178+
218179
Ok(true)
219180
}
220181

221182
/// Unsubscribe from a topic
222-
pub(crate) fn hermes_ipfs_unsubscribe(
183+
pub(crate) async fn hermes_ipfs_unsubscribe(
223184
kind: SubscriptionKind,
224185
app_name: &ApplicationName,
225186
topic: &PubsubTopic,
226187
) -> Result<bool, Errno> {
227188
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
228189
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "unsubscribing from PubSub topic");
229190

230-
if ipfs.apps.topic_subscriptions_contains(kind, topic) {
231-
let topic_owned = topic.clone();
232-
if let Ok(rt) = tokio::runtime::Handle::try_current() {
233-
tracing::debug!("unsubscribe with existing Tokio runtime");
234-
let (tx, rx) = std::sync::mpsc::channel();
235-
tokio::task::spawn_blocking(move || {
236-
let res = rt.block_on(ipfs.pubsub_unsubscribe(&topic_owned));
237-
let _ = tx.send(res);
238-
});
239-
rx.recv().map_err(|_| Errno::PubsubUnsubscribeError)??;
240-
} else {
241-
tracing::debug!("unsubscribe without existing Tokio runtime");
242-
let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?;
243-
rt.block_on(ipfs.pubsub_unsubscribe(topic))?;
244-
}
191+
unsubscribe_from_topic!(
192+
ipfs,
193+
kind,
194+
app_name,
195+
topic,
196+
ipfs.pubsub_unsubscribe(topic).await
197+
);
245198

246-
ipfs.apps.removed_topic_stream(kind, topic);
247-
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "removed subscription topic stream");
248-
} else {
249-
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "topic subscription does not exist");
250-
}
251-
ipfs.apps
252-
.removed_app_topic_subscription(kind, app_name, topic);
253199
Ok(true)
254200
}
255201

256202
/// Publish message to a topic
257-
pub(crate) fn hermes_ipfs_publish(
203+
pub(crate) async fn hermes_ipfs_publish(
258204
app_name: &ApplicationName,
259205
topic: &PubsubTopic,
260206
message: MessageData,
@@ -269,26 +215,7 @@ pub(crate) fn hermes_ipfs_publish(
269215
"📤 Publishing PubSub message"
270216
);
271217

272-
let res = if tokio::runtime::Handle::try_current().is_ok() {
273-
tracing::debug!("publish with existing Tokio runtime");
274-
275-
let (tx, rx) = std::sync::mpsc::channel();
276-
let topic_owned = topic.clone();
277-
278-
tokio::task::spawn_blocking(move || {
279-
let handle = tokio::runtime::Handle::current();
280-
let res = handle.block_on(ipfs.pubsub_publish(topic_owned, message));
281-
let _ = tx.send(res);
282-
});
283-
284-
rx.recv().map_err(|_| Errno::PubsubPublishError)
285-
} else {
286-
tracing::debug!("publish without existing Tokio runtime");
287-
288-
let rt = tokio::runtime::Runtime::new().map_err(|_| Errno::ServiceUnavailable)?;
289-
290-
Ok(rt.block_on(ipfs.pubsub_publish(topic.to_string(), message)))
291-
}?;
218+
let res = ipfs.pubsub_publish(topic, message).await;
292219

293220
match &res {
294221
Ok(()) => {

0 commit comments

Comments
 (0)