Skip to content

Commit 910eb27

Browse files
committed
feat: Add DIPS ipfs validation
1 parent 704c49c commit 910eb27

File tree

8 files changed

+657
-26
lines changed

8 files changed

+657
-26
lines changed

Cargo.lock

Lines changed: 321 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ members = [
44
"crates/attestation",
55
"crates/config",
66
"crates/dips",
7-
"crates/indexer-receipt",
7+
"crates/indexer-receipt",
88
"crates/monitor",
99
"crates/query",
1010
"crates/service",
@@ -82,6 +82,7 @@ prost = "0.13.4"
8282
prost-types = "0.13.3"
8383
dipper-rpc = { git = "https://github.com/edgeandnode/dipper/", rev = "c8700e2", default-features = false }
8484
tonic-build = "0.12.3"
85+
serde_yaml = "0.9.21"
8586

8687
[patch.crates-io.tap_core]
8788
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"

crates/dips/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ prost-types.workspace = true
1515
uuid.workspace = true
1616
base64.workspace = true
1717
tokio.workspace = true
18+
futures = "0.3"
19+
20+
http = "0.2"
21+
derivative = "2.2.0"
22+
ipfs-api-backend-hyper = {version = "0.6.0", features = ["with-send-sync"] }
23+
ipfs-api-prelude = {version = "0.6.0", features = ["with-send-sync"] }
24+
bytes = "1.10.0"
25+
serde_yaml.workspace = true
26+
serde.workspace = true
1827

1928
[build-dependencies]
2029
tonic-build = { workspace = true }

crates/dips/src/ipfs.rs

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
use std::{str::FromStr, sync::Arc};
2+
3+
use derivative::Derivative;
4+
use futures::TryStreamExt;
5+
use http::Uri;
6+
use ipfs_api_prelude::{IpfsApi, TryFromUri};
7+
use serde::Deserialize;
8+
use tonic::async_trait;
9+
10+
use crate::DipsError;
11+
12+
#[async_trait]
13+
pub trait IpfsFetcher: Send + Sync + std::fmt::Debug {
14+
async fn fetch(&self, file: &str) -> Result<GraphManifest, DipsError>;
15+
}
16+
17+
#[async_trait]
18+
impl<T: IpfsFetcher> IpfsFetcher for Arc<T> {
19+
async fn fetch(&self, file: &str) -> Result<GraphManifest, DipsError> {
20+
self.as_ref().fetch(file).await
21+
}
22+
}
23+
24+
#[derive(Derivative)]
25+
#[derivative(Debug)]
26+
pub struct IpfsClient {
27+
#[derivative(Debug = "ignore")]
28+
client: ipfs_api_backend_hyper::IpfsClient,
29+
}
30+
31+
impl IpfsClient {
32+
pub fn new(url: &str) -> anyhow::Result<Self> {
33+
Ok(Self {
34+
client: ipfs_api_backend_hyper::IpfsClient::build_with_base_uri(
35+
Uri::from_str(url).map_err(|err| anyhow::Error::from(err))?,
36+
),
37+
})
38+
}
39+
}
40+
41+
#[async_trait]
42+
impl IpfsFetcher for IpfsClient {
43+
async fn fetch(&self, file: &str) -> Result<GraphManifest, DipsError> {
44+
let content = self
45+
.client
46+
.get(file.as_ref())
47+
.map_ok(|chunk| chunk.to_vec())
48+
.try_concat()
49+
.await
50+
.unwrap();
51+
52+
let manifest: GraphManifest = serde_yaml::from_slice(&content)
53+
.map_err(|_| DipsError::InvalidSubgraphManifest(file.to_string()))?;
54+
55+
Ok(manifest)
56+
}
57+
}
58+
59+
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
60+
#[serde(rename_all = "camelCase")]
61+
pub struct DataSource {
62+
network: String,
63+
}
64+
65+
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
66+
#[serde(rename_all = "camelCase")]
67+
pub struct GraphManifest {
68+
data_sources: Vec<DataSource>,
69+
}
70+
71+
impl GraphManifest {
72+
pub fn network(&self) -> Option<String> {
73+
self.data_sources.first().map(|ds| ds.network.clone())
74+
}
75+
}
76+
77+
#[cfg(test)]
78+
#[derive(Debug)]
79+
pub struct TestIpfsClient {
80+
manifest: GraphManifest,
81+
}
82+
83+
#[cfg(test)]
84+
impl TestIpfsClient {
85+
pub fn mainnet() -> Self {
86+
Self {
87+
manifest: GraphManifest {
88+
data_sources: vec![DataSource {
89+
network: "mainnet".to_string(),
90+
}],
91+
},
92+
}
93+
}
94+
}
95+
96+
#[cfg(test)]
97+
#[async_trait]
98+
impl IpfsFetcher for TestIpfsClient {
99+
async fn fetch(&self, _file: &str) -> Result<GraphManifest, DipsError> {
100+
Ok(self.manifest.clone())
101+
}
102+
}
103+
104+
#[cfg(test)]
105+
mod test {
106+
use crate::ipfs::{DataSource, GraphManifest};
107+
108+
#[test]
109+
fn test_deserialize_manifest() {
110+
let manifest: GraphManifest = serde_yaml::from_str(&MANIFEST).unwrap();
111+
assert_eq!(
112+
manifest,
113+
GraphManifest {
114+
data_sources: vec![
115+
DataSource {
116+
network: "scroll".to_string()
117+
},
118+
DataSource {
119+
network: "scroll".to_string()
120+
}
121+
],
122+
}
123+
)
124+
}
125+
126+
const MANIFEST: &'static str = "
127+
dataSources:
128+
- kind: ethereum/contract
129+
mapping:
130+
abis:
131+
- file:
132+
/: /ipfs/QmTU8eKx6pCgtff6Uvc7srAwR8BPiM3jTMBw9ahrXBjRzY
133+
name: Factory
134+
apiVersion: 0.0.6
135+
entities: []
136+
eventHandlers:
137+
- event: >-
138+
PoolCreated(indexed address,indexed address,indexed
139+
uint24,int24,address)
140+
handler: handlePoolCreated
141+
file:
142+
/: /ipfs/Qmbj3ituUaFRnTuahJ8yCG9GPiPqsRYq2T7umucZzPpLFn
143+
kind: ethereum/events
144+
language: wasm/assemblyscript
145+
name: Factory
146+
network: scroll
147+
source:
148+
abi: Factory
149+
address: '0x46B3fDF7b5CDe91Ac049936bF0bDb12c5d22202e'
150+
startBlock: 82522
151+
- kind: ethereum/contract
152+
mapping:
153+
abis:
154+
- file:
155+
/: /ipfs/QmaxxqQ7xzbGDPWu184uoq2g5sofazB9B9tEDrpPjmRZ8q
156+
name: NonfungiblePositionManager
157+
158+
apiVersion: 0.0.6
159+
entities: []
160+
eventHandlers:
161+
- event: 'IncreaseLiquidity(indexed uint256,uint128,uint256,uint256)'
162+
handler: handleIncreaseLiquidity
163+
file:
164+
/: /ipfs/QmcWrYawVufpST4u2Ed8Jz6jxFFaYXxERGwqstrpniY8C5
165+
kind: ethereum/events
166+
language: wasm/assemblyscript
167+
name: NonfungiblePositionManager
168+
network: scroll
169+
source:
170+
abi: NonfungiblePositionManager
171+
address: '0x0389879e0156033202C44BF784ac18fC02edeE4f'
172+
startBlock: 82597
173+
features:
174+
- nonFatalErrors
175+
schema:
176+
file:
177+
/: /ipfs/QmSCM39NPLAjNQXsnkqq6H8z8KBi5YkfYyApPYLQbbC2kb
178+
specVersion: 0.0.4
179+
templates:
180+
- kind: ethereum/contract
181+
mapping:
182+
abis:
183+
- file:
184+
/: /ipfs/QmULRc8Ac1J6YFy11z7JRpyThb6f7nmL5mMTQvN7LKj2Vy
185+
name: Pool
186+
- file:
187+
/: /ipfs/QmTU8eKx6pCgtff6Uvc7srAwR8BPiM3jTMBw9ahrXBjRzY
188+
name: Factory
189+
- file:
190+
/: /ipfs/QmXuTbDkNrN27VydxbS2huvKRk62PMgUTdPDWkxcr2w7j2
191+
name: ERC20
192+
apiVersion: 0.0.6
193+
entities: []
194+
eventHandlers:
195+
- event: 'Initialize(uint160,int24)'
196+
handler: handleInitialize
197+
- event: >-
198+
Swap(indexed address,indexed
199+
address,int256,int256,uint160,uint128,int24)
200+
handler: handleSwap
201+
- event: >-
202+
Mint(address,indexed address,indexed int24,indexed
203+
int24,uint128,uint256,uint256)
204+
handler: handleMint
205+
- event: >-
206+
Burn(indexed address,indexed int24,indexed
207+
int24,uint128,uint256,uint256)
208+
handler: handleBurn
209+
- event: >-
210+
Flash(indexed address,indexed
211+
address,uint256,uint256,uint256,uint256)
212+
handler: handleFlash
213+
- event: >-
214+
Collect(indexed address,address,indexed int24,indexed
215+
int24,uint128,uint128)
216+
handler: handlePoolCollect
217+
- event: 'CollectProtocol(indexed address,indexed address,uint128,uint128)'
218+
handler: handleProtocolCollect
219+
- event: 'SetFeeProtocol(uint8,uint8,uint8,uint8)'
220+
handler: handleSetProtocolFee
221+
file:
222+
/: /ipfs/QmPtcuzBcWWBGXFKGdfUgqZLJov4c4Crt85ANbER2eHdCb
223+
kind: ethereum/events
224+
language: wasm/assemblyscript
225+
name: Pool
226+
network: scroll
227+
source:
228+
abi: Pool
229+
230+
";
231+
}

crates/dips/src/lib.rs

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33

44
use std::{str::FromStr, sync::Arc};
55

6+
use ipfs::IpfsFetcher;
7+
use price::PriceCalculator;
68
use thegraph_core::alloy::{
79
core::primitives::Address,
8-
primitives::{b256, ChainId, PrimitiveSignature as Signature, B256},
10+
primitives::{b256, ChainId, PrimitiveSignature as Signature, Uint, B256},
911
signers::SignerSync,
1012
sol,
1113
sol_types::{eip712_domain, Eip712Domain, SolStruct, SolValue},
1214
};
1315

16+
pub mod ipfs;
17+
pub mod price;
1418
pub mod proto;
1519
pub mod server;
1620
pub mod store;
@@ -131,6 +135,14 @@ pub enum DipsError {
131135
PayerNotAuthorised(Address),
132136
#[error("voucher payee {actual} does not match the expected address {expected}")]
133137
UnexpectedPayee { expected: Address, actual: Address },
138+
#[error("invalid subgraph id {0}")]
139+
InvalidSubgraphManifest(String),
140+
#[error("voucher for chain id {0}, subgraph manifest has network {1}")]
141+
SubgraphChainIdMistmatch(String, String),
142+
#[error("chainId {0} is not supported")]
143+
UnsupportedChainId(String),
144+
#[error("price per block is below configured price for chain {0}, minimum: {1}, offered: {2}")]
145+
PricePerBlockTooLow(String, u64, String),
134146
// cancellation
135147
#[error("cancelled_by is expected to match the signer")]
136148
UnexpectedSigner,
@@ -276,6 +288,8 @@ pub async fn validate_and_create_agreement(
276288
expected_payee: &Address,
277289
allowed_payers: impl AsRef<[Address]>,
278290
voucher: Vec<u8>,
291+
price_calculator: &PriceCalculator,
292+
ipfs_client: Arc<dyn IpfsFetcher>,
279293
) -> Result<Uuid, DipsError> {
280294
let decoded_voucher = SignedIndexingAgreementVoucher::abi_decode(voucher.as_ref(), true)
281295
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
@@ -287,6 +301,35 @@ pub async fn validate_and_create_agreement(
287301

288302
decoded_voucher.validate(domain, expected_payee, allowed_payers)?;
289303

304+
let manifest = ipfs_client.fetch(&metadata.subgraphDeploymentId).await?;
305+
match manifest.network() {
306+
Some(chain_id) if chain_id == metadata.chainId => {}
307+
Some(chain_id) => {
308+
return Err(DipsError::SubgraphChainIdMistmatch(
309+
metadata.chainId,
310+
chain_id,
311+
))
312+
}
313+
None => return Err(DipsError::UnsupportedChainId("".to_string())),
314+
}
315+
316+
let chain_id = manifest
317+
.network()
318+
.ok_or_else(|| DipsError::UnsupportedChainId("".to_string()))?;
319+
320+
let offered_price = decoded_voucher.voucher.maxOngoingAmountPerEpoch;
321+
match price_calculator.get_minimum_price(&chain_id) {
322+
Some(price) if offered_price.lt(&Uint::from(price)) => {
323+
return Err(DipsError::PricePerBlockTooLow(
324+
chain_id,
325+
price,
326+
offered_price.to_string(),
327+
))
328+
}
329+
Some(_) => {}
330+
None => return Err(DipsError::UnsupportedChainId(chain_id)),
331+
}
332+
290333
store
291334
.create_agreement(decoded_voucher.clone(), metadata)
292335
.await?;
@@ -339,8 +382,9 @@ mod test {
339382

340383
pub use crate::store::{AgreementStore, InMemoryAgreementStore};
341384
use crate::{
342-
dips_agreement_eip712_domain, dips_cancellation_eip712_domain, CancellationRequest,
343-
DipsError, IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata,
385+
dips_agreement_eip712_domain, dips_cancellation_eip712_domain, ipfs::TestIpfsClient,
386+
price::PriceCalculator, CancellationRequest, DipsError, IndexingAgreementVoucher,
387+
SubgraphIndexingVoucherMetadata,
344388
};
345389

346390
#[tokio::test]
@@ -355,7 +399,7 @@ mod test {
355399
basePricePerEpoch: U256::from(10000_u64),
356400
pricePerEntity: U256::from(100_u64),
357401
protocolNetwork: "eip155:42161".to_string(),
358-
chainId: "eip155:1".to_string(),
402+
chainId: "mainnet".to_string(),
359403
subgraphDeploymentId: deployment_id,
360404
};
361405

@@ -386,6 +430,8 @@ mod test {
386430
&payee_addr,
387431
vec![payer_addr],
388432
abi_voucher,
433+
&PriceCalculator::for_testing(),
434+
Arc::new(TestIpfsClient::mainnet()),
389435
)
390436
.await
391437
.unwrap();
@@ -544,7 +590,7 @@ mod test {
544590
basePricePerEpoch: U256::from(10000_u64),
545591
pricePerEntity: U256::from(100_u64),
546592
protocolNetwork: "eip155:42161".to_string(),
547-
chainId: "eip155:1".to_string(),
593+
chainId: "mainnet".to_string(),
548594
subgraphDeploymentId: deployment_id,
549595
};
550596

@@ -575,6 +621,8 @@ mod test {
575621
&payee_addr,
576622
vec![payer_addr],
577623
signed_voucher.encode_vec(),
624+
&PriceCalculator::for_testing(),
625+
Arc::new(TestIpfsClient::mainnet()),
578626
)
579627
.await?;
580628

0 commit comments

Comments
 (0)