Skip to content

Commit 90e949d

Browse files
authored
IPFS: use IPFS Gateway API (#5600)
* ipfs: create new gateway and rpc clients Introduces a new IPFS gateway client, refactoring the existing RPC API client to reuse the new code. Also introduces some new types and concepts to make working with IPFS easier. * ipfs: use new ipfs types instead of old ones Integrates the new IPFS types and clients into the existing codebase, replacing the old types and client.
1 parent 2509212 commit 90e949d

File tree

28 files changed

+2758
-1111
lines changed

28 files changed

+2758
-1111
lines changed

Cargo.lock

Lines changed: 77 additions & 333 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ async-graphql-axum = "7.0.6"
3232
axum = "0.7.5"
3333
chrono = "0.4.38"
3434
clap = { version = "4.5.4", features = ["derive", "env"] }
35+
derivative = "2.2.0"
3536
diesel = { version = "2.1.3", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid"] }
3637
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
3738
diesel-dynamic-schema = "0.2.1"

core/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,4 @@ anyhow = "1.0"
2525

2626
[dev-dependencies]
2727
tower-test = { git = "https://github.com/tower-rs/tower.git" }
28-
ipfs-api-backend-hyper = "0.6"
29-
ipfs-api = { version = "0.17.0", features = [
30-
"with-hyper-rustls",
31-
], default-features = false }
3228
uuid = { version = "1.9.1", features = ["v4"] }

core/src/polling_monitor/ipfs_service.rs

Lines changed: 45 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,27 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
14
use anyhow::{anyhow, Error};
25
use bytes::Bytes;
36
use graph::futures03::future::BoxFuture;
4-
use graph::{
5-
derive::CheapClone,
6-
ipfs_client::{CidFile, IpfsClient},
7-
prelude::CheapClone,
8-
};
9-
use std::time::Duration;
7+
use graph::ipfs::ContentPath;
8+
use graph::ipfs::IpfsClient;
9+
use graph::ipfs::IpfsError;
10+
use graph::{derive::CheapClone, prelude::CheapClone};
1011
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};
1112

12-
const CLOUDFLARE_TIMEOUT: u16 = 524;
13-
const GATEWAY_TIMEOUT: u16 = 504;
14-
15-
pub type IpfsService = Buffer<CidFile, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
13+
pub type IpfsService = Buffer<ContentPath, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
1614

1715
pub fn ipfs_service(
18-
client: IpfsClient,
16+
client: Arc<dyn IpfsClient>,
1917
max_file_size: usize,
2018
timeout: Duration,
2119
rate_limit: u16,
2220
) -> IpfsService {
2321
let ipfs = IpfsServiceInner {
2422
client,
25-
max_file_size,
2623
timeout,
24+
max_file_size,
2725
};
2826

2927
let svc = ServiceBuilder::new()
@@ -38,37 +36,30 @@ pub fn ipfs_service(
3836

3937
#[derive(Clone, CheapClone)]
4038
struct IpfsServiceInner {
41-
client: IpfsClient,
42-
max_file_size: usize,
39+
client: Arc<dyn IpfsClient>,
4340
timeout: Duration,
41+
max_file_size: usize,
4442
}
4543

4644
impl IpfsServiceInner {
47-
async fn call_inner(self, req: CidFile) -> Result<Option<Bytes>, Error> {
48-
let CidFile { cid, path } = req;
49-
let multihash = cid.hash().code();
45+
async fn call_inner(self, path: ContentPath) -> Result<Option<Bytes>, Error> {
46+
let multihash = path.cid().hash().code();
5047
if !SAFE_MULTIHASHES.contains(&multihash) {
5148
return Err(anyhow!("CID multihash {} is not allowed", multihash));
5249
}
5350

54-
let cid_str = match path {
55-
Some(path) => format!("{}/{}", cid, path),
56-
None => cid.to_string(),
57-
};
58-
5951
let res = self
6052
.client
61-
.cat_all(&cid_str, Some(self.timeout), self.max_file_size)
53+
.cat(&path, self.max_file_size, Some(self.timeout))
6254
.await;
6355

6456
match res {
6557
Ok(file_bytes) => Ok(Some(file_bytes)),
66-
Err(e) => match e.status().map(|e| e.as_u16()) {
67-
// Timeouts in IPFS mean the file is not available, so we return `None`
68-
Some(GATEWAY_TIMEOUT) | Some(CLOUDFLARE_TIMEOUT) => return Ok(None),
69-
_ if e.is_timeout() => return Ok(None),
70-
_ => return Err(e.into()),
71-
},
58+
Err(IpfsError::RequestFailed(err)) if err.is_timeout() => {
59+
// Timeouts in IPFS mean that the content is not available, so we return `None`.
60+
Ok(None)
61+
}
62+
Err(err) => Err(err.into()),
7263
}
7364
}
7465
}
@@ -96,48 +87,42 @@ const SAFE_MULTIHASHES: [u64; 15] = [
9687

9788
#[cfg(test)]
9889
mod test {
99-
use ipfs::IpfsApi;
100-
use ipfs_api as ipfs;
101-
use std::{fs, str::FromStr, time::Duration};
90+
use std::time::Duration;
91+
92+
use graph::components::link_resolver::ArweaveClient;
93+
use graph::components::link_resolver::ArweaveResolver;
94+
use graph::data::value::Word;
95+
use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing;
96+
use graph::ipfs::IpfsRpcClient;
97+
use graph::ipfs::ServerAddress;
98+
use graph::tokio;
10299
use tower::ServiceExt;
103-
104-
use cid::Cid;
105-
use graph::{
106-
components::link_resolver::{ArweaveClient, ArweaveResolver},
107-
data::value::Word,
108-
ipfs_client::IpfsClient,
109-
tokio,
110-
};
111-
112100
use uuid::Uuid;
113101

102+
use super::*;
103+
114104
#[tokio::test]
115105
async fn cat_file_in_folder() {
116-
let path = "./tests/fixtures/ipfs_folder";
117-
let uid = Uuid::new_v4().to_string();
118-
fs::write(format!("{}/random.txt", path), &uid).unwrap();
106+
let random_bytes = Uuid::new_v4().as_bytes().to_vec();
107+
let ipfs_file = ("dir/file.txt", random_bytes.clone());
119108

120-
let cl: ipfs::IpfsClient = ipfs::IpfsClient::default();
109+
let add_resp = add_files_to_local_ipfs_node_for_testing([ipfs_file])
110+
.await
111+
.unwrap();
121112

122-
let rsp = cl.add_path(path).await.unwrap();
113+
let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash;
123114

124-
let ipfs_folder = rsp.iter().find(|rsp| rsp.name == "ipfs_folder").unwrap();
115+
let client =
116+
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard())
117+
.unwrap()
118+
.into_boxed();
125119

126-
let local = IpfsClient::localhost();
127-
let cid = Cid::from_str(&ipfs_folder.hash).unwrap();
128-
let file = "random.txt".to_string();
120+
let svc = ipfs_service(client.into(), 100000, Duration::from_secs(30), 10);
129121

130-
let svc = super::ipfs_service(local, 100000, Duration::from_secs(5), 10);
122+
let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap();
123+
let content = svc.oneshot(path).await.unwrap().unwrap();
131124

132-
let content = svc
133-
.oneshot(super::CidFile {
134-
cid,
135-
path: Some(file),
136-
})
137-
.await
138-
.unwrap()
139-
.unwrap();
140-
assert_eq!(content.to_vec(), uid.as_bytes().to_vec());
125+
assert_eq!(content.to_vec(), random_bytes);
141126
}
142127

143128
#[tokio::test]

core/src/subgraph/context/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use graph::{
1818
CausalityRegion, DataSource, DataSourceTemplate,
1919
},
2020
derive::CheapClone,
21-
ipfs_client::CidFile,
21+
ipfs::ContentPath,
2222
prelude::{
2323
BlockNumber, BlockPtr, BlockState, CancelGuard, CheapClone, DeploymentHash,
2424
MetricsRegistry, RuntimeHostBuilder, SubgraphCountMetric, SubgraphInstanceMetrics,
@@ -228,8 +228,8 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
228228
}
229229

230230
pub struct OffchainMonitor {
231-
ipfs_monitor: PollingMonitor<CidFile>,
232-
ipfs_monitor_rx: mpsc::UnboundedReceiver<(CidFile, Bytes)>,
231+
ipfs_monitor: PollingMonitor<ContentPath>,
232+
ipfs_monitor_rx: mpsc::UnboundedReceiver<(ContentPath, Bytes)>,
233233
arweave_monitor: PollingMonitor<Base64>,
234234
arweave_monitor_rx: mpsc::UnboundedReceiver<(Base64, Bytes)>,
235235
}

graph/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ atomic_refcell = "0.1.13"
1515
old_bigdecimal = { version = "=0.1.2", features = ["serde"], package = "bigdecimal" }
1616
bytes = "1.0.1"
1717
cid = "0.11.1"
18+
derivative = { workspace = true }
1819
graph_derive = { path = "./derive" }
1920
diesel = { workspace = true }
2021
diesel_derives = { workspace = true }
@@ -90,7 +91,7 @@ defer = "0.2"
9091
# Our fork contains patches to make some fields optional for Celo and Fantom compatibility.
9192
# Without the "arbitrary_precision" feature, we get the error `data did not match any variant of untagged enum Response`.
9293
web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-patches-onto-0.18", features = [
93-
"arbitrary_precision","test"
94+
"arbitrary_precision", "test"
9495
] }
9596
serde_plain = "1.0.2"
9697
csv = "1.3.0"
@@ -100,6 +101,7 @@ object_store = { version = "0.10.1", features = ["gcp"] }
100101
clap.workspace = true
101102
maplit = "1.0.2"
102103
hex-literal = "0.4"
104+
wiremock = "0.6.1"
103105

104106
[build-dependencies]
105107
tonic-build = { workspace = true }

0 commit comments

Comments
 (0)