Skip to content

Commit 32f30db

Browse files
authored
feat: Add DIPS ipfs validation (#614)
1 parent db93587 commit 32f30db

File tree

8 files changed

+932
-105
lines changed

8 files changed

+932
-105
lines changed

Cargo.lock

Lines changed: 451 additions & 67 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ members = [
44
"crates/attestation",
55
"crates/config",
66
"crates/dips",
7-
"crates/indexer-receipt",
7+
"crates/indexer-receipt",
88
"crates/monitor",
99
"crates/query",
1010
"crates/service",
@@ -82,6 +82,7 @@ prost = "0.13.4"
8282
prost-types = "0.13.3"
8383
dipper-rpc = { git = "https://github.com/edgeandnode/dipper/", rev = "c8700e2", default-features = false }
8484
tonic-build = "0.12.3"
85+
serde_yaml = "0.9.21"
8586

8687
[patch.crates-io.tap_core]
8788
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"

crates/dips/Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@ prost-types.workspace = true
1515
uuid.workspace = true
1616
base64.workspace = true
1717
tokio.workspace = true
18+
futures = "0.3"
19+
20+
http = "0.2"
21+
derivative = "2.2.0"
22+
ipfs-api-backend-hyper = {version = "0.6.0", features = ["with-send-sync"] }
23+
ipfs-api-prelude = {version = "0.6.0", features = ["with-send-sync"] }
24+
bytes = "1.10.0"
25+
serde_yaml.workspace = true
26+
serde.workspace = true
27+
28+
[dev-dependencies]
29+
rand = "0.9.0"
1830

1931
[build-dependencies]
2032
tonic-build = { workspace = true }

crates/dips/src/ipfs.rs

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::{str::FromStr, sync::Arc};
5+
6+
use derivative::Derivative;
7+
use futures::TryStreamExt;
8+
use http::Uri;
9+
use ipfs_api_prelude::{IpfsApi, TryFromUri};
10+
use serde::Deserialize;
11+
use tonic::async_trait;
12+
13+
use crate::DipsError;
14+
15+
#[async_trait]
16+
pub trait IpfsFetcher: Send + Sync + std::fmt::Debug {
17+
async fn fetch(&self, file: &str) -> Result<GraphManifest, DipsError>;
18+
}
19+
20+
#[async_trait]
21+
impl<T: IpfsFetcher> IpfsFetcher for Arc<T> {
22+
async fn fetch(&self, file: &str) -> Result<GraphManifest, DipsError> {
23+
self.as_ref().fetch(file).await
24+
}
25+
}
26+
27+
#[derive(Derivative)]
28+
#[derivative(Debug)]
29+
pub struct IpfsClient {
30+
#[derivative(Debug = "ignore")]
31+
client: ipfs_api_backend_hyper::IpfsClient,
32+
}
33+
34+
impl IpfsClient {
35+
pub fn new(url: &str) -> anyhow::Result<Self> {
36+
Ok(Self {
37+
client: ipfs_api_backend_hyper::IpfsClient::build_with_base_uri(
38+
Uri::from_str(url).map_err(anyhow::Error::from)?,
39+
),
40+
})
41+
}
42+
}
43+
44+
#[async_trait]
45+
impl IpfsFetcher for IpfsClient {
46+
async fn fetch(&self, file: &str) -> Result<GraphManifest, DipsError> {
47+
let content = self
48+
.client
49+
.get(file.as_ref())
50+
.map_ok(|chunk| chunk.to_vec())
51+
.try_concat()
52+
.await
53+
.unwrap();
54+
55+
let manifest: GraphManifest = serde_yaml::from_slice(&content)
56+
.map_err(|_| DipsError::InvalidSubgraphManifest(file.to_string()))?;
57+
58+
Ok(manifest)
59+
}
60+
}
61+
62+
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
63+
#[serde(rename_all = "camelCase")]
64+
pub struct DataSource {
65+
network: String,
66+
}
67+
68+
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
69+
#[serde(rename_all = "camelCase")]
70+
pub struct GraphManifest {
71+
data_sources: Vec<DataSource>,
72+
}
73+
74+
impl GraphManifest {
75+
pub fn network(&self) -> Option<String> {
76+
self.data_sources.first().map(|ds| ds.network.clone())
77+
}
78+
}
79+
80+
#[cfg(test)]
81+
#[derive(Debug)]
82+
pub struct TestIpfsClient {
83+
manifest: GraphManifest,
84+
}
85+
86+
#[cfg(test)]
87+
impl TestIpfsClient {
88+
pub fn mainnet() -> Self {
89+
Self {
90+
manifest: GraphManifest {
91+
data_sources: vec![DataSource {
92+
network: "mainnet".to_string(),
93+
}],
94+
},
95+
}
96+
}
97+
}
98+
99+
#[cfg(test)]
100+
#[async_trait]
101+
impl IpfsFetcher for TestIpfsClient {
102+
async fn fetch(&self, _file: &str) -> Result<GraphManifest, DipsError> {
103+
Ok(self.manifest.clone())
104+
}
105+
}
106+
107+
#[cfg(test)]
108+
mod test {
109+
use crate::ipfs::{DataSource, GraphManifest};
110+
111+
#[test]
112+
fn test_deserialize_manifest() {
113+
let manifest: GraphManifest = serde_yaml::from_str(MANIFEST).unwrap();
114+
assert_eq!(
115+
manifest,
116+
GraphManifest {
117+
data_sources: vec![
118+
DataSource {
119+
network: "scroll".to_string()
120+
},
121+
DataSource {
122+
network: "scroll".to_string()
123+
}
124+
],
125+
}
126+
)
127+
}
128+
129+
const MANIFEST: &str = "
130+
dataSources:
131+
- kind: ethereum/contract
132+
mapping:
133+
abis:
134+
- file:
135+
/: /ipfs/QmTU8eKx6pCgtff6Uvc7srAwR8BPiM3jTMBw9ahrXBjRzY
136+
name: Factory
137+
apiVersion: 0.0.6
138+
entities: []
139+
eventHandlers:
140+
- event: >-
141+
PoolCreated(indexed address,indexed address,indexed
142+
uint24,int24,address)
143+
handler: handlePoolCreated
144+
file:
145+
/: /ipfs/Qmbj3ituUaFRnTuahJ8yCG9GPiPqsRYq2T7umucZzPpLFn
146+
kind: ethereum/events
147+
language: wasm/assemblyscript
148+
name: Factory
149+
network: scroll
150+
source:
151+
abi: Factory
152+
address: '0x46B3fDF7b5CDe91Ac049936bF0bDb12c5d22202e'
153+
startBlock: 82522
154+
- kind: ethereum/contract
155+
mapping:
156+
abis:
157+
- file:
158+
/: /ipfs/QmaxxqQ7xzbGDPWu184uoq2g5sofazB9B9tEDrpPjmRZ8q
159+
name: NonfungiblePositionManager
160+
161+
apiVersion: 0.0.6
162+
entities: []
163+
eventHandlers:
164+
- event: 'IncreaseLiquidity(indexed uint256,uint128,uint256,uint256)'
165+
handler: handleIncreaseLiquidity
166+
file:
167+
/: /ipfs/QmcWrYawVufpST4u2Ed8Jz6jxFFaYXxERGwqstrpniY8C5
168+
kind: ethereum/events
169+
language: wasm/assemblyscript
170+
name: NonfungiblePositionManager
171+
network: scroll
172+
source:
173+
abi: NonfungiblePositionManager
174+
address: '0x0389879e0156033202C44BF784ac18fC02edeE4f'
175+
startBlock: 82597
176+
features:
177+
- nonFatalErrors
178+
schema:
179+
file:
180+
/: /ipfs/QmSCM39NPLAjNQXsnkqq6H8z8KBi5YkfYyApPYLQbbC2kb
181+
specVersion: 0.0.4
182+
templates:
183+
- kind: ethereum/contract
184+
mapping:
185+
abis:
186+
- file:
187+
/: /ipfs/QmULRc8Ac1J6YFy11z7JRpyThb6f7nmL5mMTQvN7LKj2Vy
188+
name: Pool
189+
- file:
190+
/: /ipfs/QmTU8eKx6pCgtff6Uvc7srAwR8BPiM3jTMBw9ahrXBjRzY
191+
name: Factory
192+
- file:
193+
/: /ipfs/QmXuTbDkNrN27VydxbS2huvKRk62PMgUTdPDWkxcr2w7j2
194+
name: ERC20
195+
apiVersion: 0.0.6
196+
entities: []
197+
eventHandlers:
198+
- event: 'Initialize(uint160,int24)'
199+
handler: handleInitialize
200+
- event: >-
201+
Swap(indexed address,indexed
202+
address,int256,int256,uint160,uint128,int24)
203+
handler: handleSwap
204+
- event: >-
205+
Mint(address,indexed address,indexed int24,indexed
206+
int24,uint128,uint256,uint256)
207+
handler: handleMint
208+
- event: >-
209+
Burn(indexed address,indexed int24,indexed
210+
int24,uint128,uint256,uint256)
211+
handler: handleBurn
212+
- event: >-
213+
Flash(indexed address,indexed
214+
address,uint256,uint256,uint256,uint256)
215+
handler: handleFlash
216+
- event: >-
217+
Collect(indexed address,address,indexed int24,indexed
218+
int24,uint128,uint128)
219+
handler: handlePoolCollect
220+
- event: 'CollectProtocol(indexed address,indexed address,uint128,uint128)'
221+
handler: handleProtocolCollect
222+
- event: 'SetFeeProtocol(uint8,uint8,uint8,uint8)'
223+
handler: handleSetProtocolFee
224+
file:
225+
/: /ipfs/QmPtcuzBcWWBGXFKGdfUgqZLJov4c4Crt85ANbER2eHdCb
226+
kind: ethereum/events
227+
language: wasm/assemblyscript
228+
name: Pool
229+
network: scroll
230+
source:
231+
abi: Pool
232+
233+
";
234+
}

0 commit comments

Comments
 (0)