Skip to content

Commit 77e5a88

Browse files
authored
fix(hermes): New implementation of IPFS get and add file (#750)
* use new implementation of add and get ipfs file Signed-off-by: bkioshn <bkioshn@gmail.com> * hermes-ipfs points to branch Signed-off-by: bkioshn <bkioshn@gmail.com> * add error to hermes-ipfs wit Signed-off-by: bkioshn <bkioshn@gmail.com> * move cbor encode logic Signed-off-by: bkioshn <bkioshn@gmail.com> * fix get_ipfs_file Signed-off-by: bkioshn <bkioshn@gmail.com> * fix syntax and format Signed-off-by: bkioshn <bkioshn@gmail.com> * update hermes-ipfs to 0.0.11 Signed-off-by: bkioshn <bkioshn@gmail.com> * update function Signed-off-by: bkioshn <bkioshn@gmail.com> * handle store documenet to db after success post Signed-off-by: bkioshn <bkioshn@gmail.com> * handle ipfs/doc-sync module id Signed-off-by: bkioshn <bkioshn@gmail.com> * change cid to string Signed-off-by: bkioshn <bkioshn@gmail.com> * minor fix Signed-off-by: bkioshn <bkioshn@gmail.com> * fix compute cid Signed-off-by: bkioshn <bkioshn@gmail.com> * fix get file and add file Signed-off-by: bkioshn <bkioshn@gmail.com> * event module all Signed-off-by: bkioshn <bkioshn@gmail.com> * handle event module Signed-off-by: bkioshn <bkioshn@gmail.com> * fix doc data encode Signed-off-by: bkioshn <bkioshn@gmail.com> * fix linter and format Signed-off-by: bkioshn <bkioshn@gmail.com> * minor fix Signed-off-by: bkioshn <bkioshn@gmail.com> --------- Signed-off-by: bkioshn <bkioshn@gmail.com>
1 parent 3b6e5bb commit 77e5a88

File tree

15 files changed

+793
-534
lines changed

15 files changed

+793
-534
lines changed

hermes/Cargo.lock

Lines changed: 413 additions & 386 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hermes/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,6 @@ unreachable = "deny"
5959
missing_docs_in_private_items = "deny"
6060
arithmetic_side_effects = "deny"
6161

62+
[patch."https://github.com/dariusc93/rust-ipfs"]
63+
rust-ipfs = { git = "https://github.com/cong-or/rust-ipfs", branch = "fix/bitswap-serve-late-arriving-peers" }
64+

hermes/apps/athena/modules/doc-sync/src/lib.rs

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ shared::bindings_generate!({
2525

2626
export!(Component);
2727

28-
use cardano_blockchain_types::pallas_codec::minicbor::{self, Encoder, data::Tag};
2928
use cid::{Cid, multihash::Multihash};
3029
use hermes::{
3130
doc_sync::api::{DocData, SyncChannel},
@@ -166,14 +165,27 @@ impl exports::hermes::http_gateway::event::Guest for Component {
166165
// Call channel::post (executes 4-step workflow on host)
167166
match channel::post(&body) {
168167
Ok(cid_bytes) => {
169-
let cid = String::from_utf8_lossy(&cid_bytes);
170-
Some(json_response(
171-
200,
172-
&serde_json::json!({
173-
"success": true,
174-
"cid": cid
175-
}),
176-
))
168+
match TryInto::<Cid>::try_into(cid_bytes) {
169+
Ok(cid) => {
170+
Some(json_response(
171+
200,
172+
&serde_json::json!({
173+
"success": true,
174+
"cid": cid.to_string()
175+
}),
176+
))
177+
},
178+
Err(e) => {
179+
error!(target: "doc_sync", "Failed to convert CID bytes to CID: {e:?}");
180+
Some(json_response(
181+
500,
182+
&serde_json::json!({
183+
"success": false,
184+
"error": "Failed to convert CID bytes to CID"
185+
}),
186+
))
187+
},
188+
}
177189
},
178190
Err(e) => {
179191
error!(target: "doc_sync", "Failed to post document: {e:?}");
@@ -213,7 +225,11 @@ fn json_response(
213225

214226
/// API for posting documents to IPFS `PubSub` channels.
215227
pub mod channel {
228+
use cardano_blockchain_types::pallas_codec::minicbor;
229+
use shared::utils::log::error;
230+
216231
use super::{DOC_SYNC_CHANNEL, DocData, SyncChannel, hermes};
232+
use crate::store_in_db;
217233

218234
/// Post a document to the "documents" channel. Returns the document's CID.
219235
///
@@ -223,19 +239,34 @@ pub mod channel {
223239
pub fn post(document_bytes: &DocData) -> Result<Vec<u8>, hermes::doc_sync::api::Errno> {
224240
// Create channel via host
225241
let channel = SyncChannel::new(DOC_SYNC_CHANNEL);
242+
// Encode the document to CBOR
243+
let document_bytes_cbor = minicbor::to_vec(document_bytes)
244+
.map_err(|_| hermes::doc_sync::api::Errno::DocErrorPlaceholder)?;
226245
// Post document via host (executes 4-step workflow in host)
227-
channel.post(document_bytes)
246+
match channel.post(&document_bytes) {
247+
Ok(cid) => {
248+
// If successfully posted, store document in db
249+
if let Err(err) = store_in_db(&document_bytes_cbor) {
250+
error!(target: "doc_sync::channel::post", "Failed to store doc in db: {err:?}");
251+
}
252+
return Ok(cid);
253+
},
254+
Err(err) => {
255+
error!(target: "doc_sync::channel::post", "Failed to post doc: {err:?}");
256+
return Err(err);
257+
},
258+
}
228259
}
229260
}
230261

231262
/// Stores the document in local `SQLite`: computes CID, stamps current time, and inserts
232263
/// into `document` table.
233-
fn store_in_db(doc: &DocData) -> anyhow::Result<()> {
234-
let cid = compute_cid(doc)?;
264+
fn store_in_db(doc_cbor: &DocData) -> anyhow::Result<()> {
265+
let cid = compute_cid(doc_cbor)?;
235266
let now = chrono::Utc::now();
236267
let row = InsertDocumentRow {
237268
cid,
238-
document: doc.clone(),
269+
document: doc_cbor.clone(),
239270
inserted_at: now,
240271
metadata: None,
241272
};
@@ -246,20 +277,13 @@ fn store_in_db(doc: &DocData) -> anyhow::Result<()> {
246277
}
247278

248279
/// Computes a `CIDv1` (CBOR codec, sha2-256 multihash) for the document bytes,
249-
/// wraps it in the IPLD CID CBOR tag (42) and returns the tagged bytes.
250-
fn compute_cid(doc: &DocData) -> anyhow::Result<Vec<u8>> {
280+
fn compute_cid(doc_cbor: &DocData) -> anyhow::Result<String> {
251281
const CBOR_CODEC: u64 = 0x51;
252-
const CID_CBOR_TAG: u64 = 42;
253282
const SHA2_256_CODE: u64 = 0x12;
254283

255-
let doc_bytes = minicbor::to_vec(doc)?;
256-
let hash = Sha256::digest(&doc_bytes);
284+
// `doc` is already in CBOR
285+
let hash = Sha256::digest(&doc_cbor);
257286
let digest = Multihash::wrap(SHA2_256_CODE, &hash)?;
258287
let cid = Cid::new_v1(CBOR_CODEC, digest);
259-
260-
let mut encoder = Encoder::new(Vec::new());
261-
encoder.tag(Tag::new(CID_CBOR_TAG))?;
262-
encoder.bytes(&cid.to_bytes())?;
263-
264-
Ok(encoder.into_writer())
288+
Ok(cid.to_string())
265289
}

hermes/apps/athena/shared/src/database/doc_sync/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::utils::sqlite;
99
#[derive(From)]
1010
pub struct InsertDocumentRow {
1111
/// CID calculated over document bytes.
12-
pub cid: Vec<u8>,
12+
pub cid: String,
1313
/// Document CBOR-encoded bytes.
1414
pub document: Vec<u8>,
1515
/// Timestamp when the document was inserted.

hermes/apps/athena/shared/src/database/sql/schema/doc_sync.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
-- Documents local storage.
22
CREATE TABLE IF NOT EXISTS document (
3-
cid BLOB NOT NULL, -- CID calculated over document bytes.
3+
cid TEXT NOT NULL, -- CID calculated over document bytes.
44
document BLOB NOT NULL, -- Document cbor-encoded bytes.
55
inserted_at TIMESTAMP NOT NULL, -- Timestamp when document was inserted at.
66
metadata BLOB, -- Cbor-encoded metadata.

hermes/bin/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ path = "tests/integration/tests/mod.rs"
3131

3232
[dependencies]
3333
# Catalyst Internal Crates
34-
hermes-ipfs = { version = "0.0.10", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "hermes-ipfs/v0.0.10", features = ["doc-sync"] }
34+
# TODO: Revert to tag after PR #763 merges
35+
hermes-ipfs = { version = "0.0.11", git = "https://github.com/input-output-hk/catalyst-libs.git", branch = "fix/bitswap-provider-connection", features = ["doc-sync"] }
3536
cardano-blockchain-types = { version = "0.0.9", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-blockchain-types/v0.0.9" }
3637
cardano-chain-follower = { version = "0.0.19", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-chain-follower/v0.0.19" }
3738
catalyst-types = { version = "0.0.11", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.11" }

hermes/bin/src/ipfs/api.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::{
55
runtime_extensions::bindings::hermes::ipfs::api::{
66
DhtKey, DhtValue, Errno, IpfsContent, IpfsFile, IpfsPath, MessageData, PeerId, PubsubTopic,
77
},
8+
wasm::module::ModuleId,
89
};
910

1011
/// Add File to IPFS
@@ -47,9 +48,8 @@ pub(crate) fn hermes_ipfs_get_file(
4748
path: &IpfsPath,
4849
) -> Result<IpfsFile, Errno> {
4950
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
50-
tracing::debug!(app_name = %app_name, path = %path, "get IPFS file");
5151
let content = ipfs.file_get(path)?;
52-
tracing::debug!(app_name = %app_name, path = %path, "got IPFS file");
52+
tracing::debug!(app_name = %app_name, path = %path, "got IPFS file with content size {:?}", content.len());
5353
Ok(content)
5454
}
5555

@@ -151,19 +151,20 @@ pub(crate) fn hermes_ipfs_get_peer_identity(
151151
pub(crate) fn hermes_ipfs_subscribe(
152152
kind: SubscriptionKind,
153153
app_name: &ApplicationName,
154-
topic: PubsubTopic,
154+
topic: &PubsubTopic,
155+
module_ids: Option<Vec<ModuleId>>,
155156
) -> Result<bool, Errno> {
156157
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
157158
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "subscribing to PubSub topic");
158-
if ipfs.apps.topic_subscriptions_contains(kind, &topic) {
159+
if ipfs.apps.topic_subscriptions_contains(kind, topic) {
159160
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "topic subscription stream already exists");
160161
} else {
161-
let handle = ipfs.pubsub_subscribe(kind, &topic)?;
162+
let handle = ipfs.pubsub_subscribe(kind, topic, module_ids)?;
162163
ipfs.apps.added_topic_stream(kind, topic.clone(), handle);
163164
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "added subscription topic stream");
164165
}
165166
ipfs.apps
166-
.added_app_topic_subscription(kind, app_name.clone(), topic);
167+
.added_app_topic_subscription(kind, app_name.clone(), topic.clone());
167168
Ok(true)
168169
}
169170

hermes/bin/src/ipfs/mod.rs

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub(crate) use api::{
3131
};
3232
use dashmap::DashMap;
3333
use hermes_ipfs::{
34-
AddIpfsFile, Cid, HermesIpfs, HermesIpfsBuilder, IpfsPath as BaseIpfsPath,
34+
Cid, HermesIpfs, HermesIpfsBuilder, IpfsPath as BaseIpfsPath,
3535
rust_ipfs::{Keypair, dummy},
3636
};
3737
use once_cell::sync::OnceCell;
@@ -48,6 +48,7 @@ use crate::{
4848
runtime_extensions::bindings::hermes::ipfs::api::{
4949
DhtKey, DhtValue, Errno, IpfsFile, IpfsPath, MessageData, PeerId, PubsubTopic,
5050
},
51+
wasm::module::ModuleId,
5152
};
5253

5354
/// Hermes IPFS Internal Node
@@ -140,12 +141,32 @@ async fn configure_listening_address(node: &hermes_ipfs::Ipfs) {
140141
match listen_addr.parse() {
141142
Ok(multiaddr) => {
142143
match node.add_listening_address(multiaddr).await {
143-
Ok(addr) => tracing::info!("IPFS listening on: {}", addr),
144-
Err(e) => tracing::error!("Failed to listen on port {}: {}", listen_port, e),
144+
Ok(addr) => tracing::info!("IPFS listening on: {addr}"),
145+
Err(e) => tracing::error!("Failed to listen on port {listen_port}: {e}"),
145146
}
146147
},
147148
Err(e) => tracing::error!("Invalid multiaddr format: {}", e),
148149
}
150+
151+
// Configure external/announce address if specified.
152+
// This tells other peers how to reach us (important in Docker/NAT environments).
153+
// Without this, nodes may advertise 127.0.0.1 which causes peers to connect to
154+
// themselves instead of the intended node.
155+
if let Ok(announce_addr) = std::env::var("IPFS_ANNOUNCE_ADDRESS") {
156+
match announce_addr.parse() {
157+
Ok(multiaddr) => {
158+
match node.add_external_address(multiaddr).await {
159+
Ok(()) => tracing::info!("IPFS announcing external address: {announce_addr}"),
160+
Err(e) => {
161+
tracing::error!("Failed to add external address {announce_addr}: {e}");
162+
},
163+
}
164+
},
165+
Err(e) => {
166+
tracing::error!("Invalid IPFS_ANNOUNCE_ADDRESS format '{announce_addr}': {e}");
167+
},
168+
}
169+
}
149170
}
150171

151172
/// Connect to custom bootstrap peers and retry failed connections in the background.
@@ -498,10 +519,7 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour<ToSwarm = Infallible> + Send +
498519
self.sender
499520
.as_ref()
500521
.ok_or(Errno::FileAddError)?
501-
.blocking_send(IpfsCommand::AddFile(
502-
AddIpfsFile::Stream((None, contents)),
503-
cmd_tx,
504-
))
522+
.blocking_send(IpfsCommand::AddFile(contents, cmd_tx))
505523
.map_err(|_| Errno::FileAddError)?;
506524
cmd_rx.blocking_recv().map_err(|_| Errno::FileAddError)?
507525
}
@@ -515,17 +533,19 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour<ToSwarm = Infallible> + Send +
515533
///
516534
/// ## Errors
517535
/// - `Errno::InvalidIpfsPath`: Invalid IPFS path
536+
/// - `Errno::InvalidCid`: Invalid CID in path
518537
/// - `Errno::FileGetError`: Failed to get the file
519538
pub(crate) fn file_get(
520539
&self,
521540
ipfs_path: &IpfsPath,
522541
) -> Result<IpfsFile, Errno> {
523542
let ipfs_path = BaseIpfsPath::from_str(ipfs_path).map_err(|_| Errno::InvalidIpfsPath)?;
543+
let cid = ipfs_path.root().cid().ok_or(Errno::InvalidCid)?;
524544
let (cmd_tx, cmd_rx) = oneshot::channel();
525545
self.sender
526546
.as_ref()
527547
.ok_or(Errno::FileGetError)?
528-
.blocking_send(IpfsCommand::GetFile(ipfs_path.clone(), cmd_tx))
548+
.blocking_send(IpfsCommand::GetFile(*cid, cmd_tx))
529549
.map_err(|_| Errno::FileGetError)?;
530550
cmd_rx.blocking_recv().map_err(|_| Errno::FileGetError)?
531551
}
@@ -578,33 +598,52 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour<ToSwarm = Infallible> + Send +
578598
cmd_rx.blocking_recv().map_err(|_| Errno::FilePinError)?
579599
}
580600

581-
/// Get file (async version)
601+
/// Get file with specific providers (async version)
582602
///
583-
/// This is the async version of `file_get` that uses `send().await` instead of
584-
/// `blocking_send()`. This is safe to call from async contexts like `PubSub`
585-
/// handlers.
603+
/// This method fetches a file from IPFS using specific providers.
586604
///
587605
/// ## Parameters
588-
/// - `ipfs_path`: The IPFS path of the file
606+
/// - `cid`: The CID of the content to fetch
607+
/// - `providers`: List of peer IDs that have the content
589608
///
590609
/// ## Errors
591-
/// - `Errno::InvalidIpfsPath`: Invalid IPFS path
592610
/// - `Errno::FileGetError`: Failed to get the file
593-
pub(crate) async fn file_get_async(
611+
pub(crate) async fn file_get_async_with_providers(
594612
&self,
595-
ipfs_path: &IpfsPath,
613+
cid: &hermes_ipfs::Cid,
614+
providers: Vec<hermes_ipfs::PeerId>,
596615
) -> Result<IpfsFile, Errno> {
597-
let ipfs_path = BaseIpfsPath::from_str(ipfs_path).map_err(|_| Errno::InvalidIpfsPath)?;
598616
let (cmd_tx, cmd_rx) = oneshot::channel();
599617
self.sender
600618
.as_ref()
601619
.ok_or(Errno::FileGetError)?
602-
.send(IpfsCommand::GetFile(ipfs_path.clone(), cmd_tx))
620+
.send(IpfsCommand::GetFileWithProviders(*cid, providers, cmd_tx))
603621
.await
604622
.map_err(|_| Errno::FileGetError)?;
605623
cmd_rx.await.map_err(|_| Errno::FileGetError)?
606624
}
607625

626+
/// Get providers of a DHT value (async version)
627+
///
628+
/// ## Parameters
629+
/// - `key`: The DHT key to look up providers for
630+
///
631+
/// ## Errors
632+
/// - `Errno::DhtGetProvidersError`: Failed to get providers
633+
pub(crate) async fn dht_get_providers_async(
634+
&self,
635+
key: DhtKey,
636+
) -> Result<HashSet<hermes_ipfs::PeerId>, Errno> {
637+
let (cmd_tx, cmd_rx) = oneshot::channel();
638+
self.sender
639+
.as_ref()
640+
.ok_or(Errno::DhtGetProvidersError)?
641+
.send(IpfsCommand::DhtGetProviders(key, cmd_tx))
642+
.await
643+
.map_err(|_| Errno::DhtGetProvidersError)?;
644+
cmd_rx.await.map_err(|_| Errno::DhtGetProvidersError)?
645+
}
646+
608647
/// Put DHT Key-Value
609648
fn dht_put(
610649
&self,
@@ -698,12 +737,18 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour<ToSwarm = Infallible> + Send +
698737
&self,
699738
kind: SubscriptionKind,
700739
topic: &PubsubTopic,
740+
module_ids: Option<Vec<ModuleId>>,
701741
) -> Result<JoinHandle<()>, Errno> {
702742
let (cmd_tx, cmd_rx) = oneshot::channel();
703743
self.sender
704744
.as_ref()
705745
.ok_or(Errno::PubsubSubscribeError)?
706-
.blocking_send(IpfsCommand::Subscribe(topic.clone(), kind, cmd_tx))
746+
.blocking_send(IpfsCommand::Subscribe(
747+
topic.clone(),
748+
kind,
749+
module_ids,
750+
cmd_tx,
751+
))
707752
.map_err(|_| Errno::PubsubSubscribeError)?;
708753
cmd_rx
709754
.blocking_recv()

0 commit comments

Comments
 (0)