Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 14 additions & 7 deletions crates/dips/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,34 @@ rpc = ["dep:prost", "dep:tonic", "dep:tonic-build", "dep:bytes"]
db = ["dep:sqlx"]

[dependencies]
build-info.workspace = true
thiserror.workspace = true
anyhow.workspace = true
alloy-rlp = "0.3.10"
thegraph-core.workspace = true
async-trait.workspace = true
build-info.workspace = true
prost-types.workspace = true
uuid.workspace = true
base64.workspace = true
tokio.workspace = true
indexer-monitor = { path = "../monitor" }

bytes = { version = "1.10.0", optional = true }
derivative = "2.2.0"

futures = "0.3"
http = "0.2"
prost = { workspace = true, optional = true }
ipfs-api-backend-hyper = { version = "0.6.0", features = ["with-send-sync"] }
ipfs-api-prelude = { version = "0.6.0", features = ["with-send-sync"] }
prost = { workspace = true, optional = true }
serde.workspace = true
serde_yaml.workspace = true
serde.workspace = true
sqlx = { workspace = true, optional = true }
thegraph-core.workspace = true
thiserror.workspace = true
tokio.workspace = true
tonic = { workspace = true, optional = true }
uuid.workspace = true

[dev-dependencies]
rand = "0.9.0"
indexer-watcher = { path = "../watcher" }

[build-dependencies]
tonic-build = { workspace = true, optional = true }
121 changes: 83 additions & 38 deletions crates/dips/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

use std::{str::FromStr, sync::Arc};

use ipfs::IpfsFetcher;
use price::PriceCalculator;
use server::DipsServerContext;
use thegraph_core::alloy::{
core::primitives::Address,
primitives::{b256, ChainId, PrimitiveSignature as Signature, Uint, B256},
Expand All @@ -21,6 +20,7 @@ pub mod price;
pub mod proto;
#[cfg(feature = "rpc")]
pub mod server;
pub mod signers;
pub mod store;

use store::AgreementStore;
Expand Down Expand Up @@ -194,14 +194,16 @@ impl SignedIndexingAgreementVoucher {
// TODO: Validate all values
pub fn validate(
&self,
signer_validator: &Arc<dyn signers::SignerValidator>,
domain: &Eip712Domain,
expected_payee: &Address,
allowed_payers: impl AsRef<[Address]>,
) -> Result<(), DipsError> {
let sig = Signature::from_str(&self.signature.to_string())
.map_err(|err| DipsError::InvalidSignature(err.to_string()))?;

let payer = sig
let payer = self.voucher.payer;
let signer = sig
.recover_address_from_prehash(&self.voucher.eip712_signing_hash(domain))
.map_err(|err| DipsError::InvalidSignature(err.to_string()))?;

Expand All @@ -211,6 +213,10 @@ impl SignedIndexingAgreementVoucher {
return Err(DipsError::PayerNotAuthorised(payer));
}

signer_validator
.validate(&payer, &signer)
.map_err(|_| DipsError::SignerNotAuthorised(signer))?;

if !self.voucher.recipient.eq(expected_payee) {
return Err(DipsError::UnexpectedPayee {
expected: *expected_payee,
Expand Down Expand Up @@ -288,14 +294,18 @@ impl CollectionRequest {
}

pub async fn validate_and_create_agreement(
store: Arc<dyn AgreementStore>,
ctx: Arc<DipsServerContext>,
domain: &Eip712Domain,
expected_payee: &Address,
allowed_payers: impl AsRef<[Address]>,
voucher: Vec<u8>,
price_calculator: &PriceCalculator,
ipfs_client: Arc<dyn IpfsFetcher>,
) -> Result<Uuid, DipsError> {
let DipsServerContext {
store,
ipfs_fetcher,
price_calculator,
signer_validator,
} = ctx.as_ref();
let decoded_voucher = SignedIndexingAgreementVoucher::abi_decode(voucher.as_ref(), true)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
let metadata = SubgraphIndexingVoucherMetadata::abi_decode(
Expand All @@ -304,9 +314,9 @@ pub async fn validate_and_create_agreement(
)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;

decoded_voucher.validate(domain, expected_payee, allowed_payers)?;
decoded_voucher.validate(signer_validator, domain, expected_payee, allowed_payers)?;

let manifest = ipfs_client.fetch(&metadata.subgraphDeploymentId).await?;
let manifest = ipfs_fetcher.fetch(&metadata.subgraphDeploymentId).await?;
match manifest.network() {
Some(chain_id) if chain_id == metadata.chainId => {}
Some(chain_id) => {
Expand Down Expand Up @@ -374,10 +384,11 @@ pub async fn validate_and_cancel_agreement(
#[cfg(test)]
mod test {
use std::{
sync::Arc,
collections::HashMap,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use indexer_monitor::EscrowAccounts;
use rand::{distr::Alphanumeric, Rng};
use thegraph_core::alloy::{
primitives::{Address, FixedBytes, U256},
Expand All @@ -388,9 +399,9 @@ mod test {

pub use crate::store::{AgreementStore, InMemoryAgreementStore};
use crate::{
dips_agreement_eip712_domain, dips_cancellation_eip712_domain, ipfs::TestIpfsClient,
price::PriceCalculator, CancellationRequest, DipsError, IndexingAgreementVoucher,
SignedIndexingAgreementVoucher, SubgraphIndexingVoucherMetadata,
dips_agreement_eip712_domain, dips_cancellation_eip712_domain, server::DipsServerContext,
CancellationRequest, DipsError, IndexingAgreementVoucher, SignedIndexingAgreementVoucher,
SubgraphIndexingVoucherMetadata,
};

#[tokio::test]
Expand Down Expand Up @@ -428,22 +439,19 @@ mod test {
let abi_voucher = voucher.abi_encode();
let id = Uuid::from_bytes(voucher.voucher.agreement_id.into());

let store = Arc::new(InMemoryAgreementStore::default());

let ctx = DipsServerContext::for_testing();
let actual_id = super::validate_and_create_agreement(
store.clone(),
ctx.clone(),
&domain,
&payee_addr,
vec![payer_addr],
abi_voucher,
&PriceCalculator::for_testing(),
Arc::new(TestIpfsClient::mainnet()),
)
.await
.unwrap();
assert_eq!(actual_id, id);

let stored_agreement = store.get_by_id(actual_id).await.unwrap().unwrap();
let stored_agreement = ctx.store.get_by_id(actual_id).await.unwrap().unwrap();

assert_eq!(voucher, stored_agreement.voucher);
assert!(!stored_agreement.cancelled);
Expand All @@ -452,6 +460,7 @@ mod test {

#[test]
fn voucher_signature_verification() {
let ctx = DipsServerContext::for_testing();
let deployment_id = "Qmbg1qF4YgHjiVfsVt6a13ddrVcRtWyJQfD4LA3CwHM29f".to_string();
let payee = PrivateKeySigner::random();
let payee_addr = payee.address();
Expand Down Expand Up @@ -484,23 +493,34 @@ mod test {
let signed = voucher.sign(&domain, payer).unwrap();
assert_eq!(
signed
.validate(&domain, &payee_addr, vec![])
.validate(&ctx.signer_validator, &domain, &payee_addr, vec![])
.unwrap_err()
.to_string(),
DipsError::PayerNotAuthorised(voucher.payer).to_string()
);
assert!(signed
.validate(&domain, &payee_addr, vec![payer_addr])
.validate(
&ctx.signer_validator,
&domain,
&payee_addr,
vec![payer_addr]
)
.is_ok());
}

#[test]
fn check_voucher_modified() {
let deployment_id = "Qmbg1qF4YgHjiVfsVt6a13ddrVcRtWyJQfD4LA3CwHM29f".to_string();
#[tokio::test]
async fn check_voucher_modified() {
let payee = PrivateKeySigner::random();
let payee_addr = payee.address();
let payer = PrivateKeySigner::random();
let payer_addr = payer.address();
let ctx = DipsServerContext::for_testing_mocked_accounts(EscrowAccounts::new(
HashMap::default(),
HashMap::from_iter(vec![(payer_addr, vec![payer_addr])]),
))
.await;

let deployment_id = "Qmbg1qF4YgHjiVfsVt6a13ddrVcRtWyJQfD4LA3CwHM29f".to_string();

let metadata = SubgraphIndexingVoucherMetadata {
basePricePerEpoch: U256::from(10000_u64),
Expand Down Expand Up @@ -530,9 +550,14 @@ mod test {

assert!(matches!(
signed
.validate(&domain, &payee_addr, vec![payer_addr])
.validate(
&ctx.signer_validator,
&domain,
&payee_addr,
vec![payer_addr]
)
.unwrap_err(),
DipsError::PayerNotAuthorised(_)
DipsError::SignerNotAuthorised(_)
));
}

Expand Down Expand Up @@ -603,9 +628,10 @@ mod test {
dips_agreement_eip712_domain()
}

pub fn test_voucher(
pub fn test_voucher_with_signer(
&self,
metadata: SubgraphIndexingVoucherMetadata,
signer: PrivateKeySigner,
) -> SignedIndexingAgreementVoucher {
let agreement_id = Uuid::now_v7();

Expand All @@ -628,14 +654,21 @@ mod test {
metadata: metadata.abi_encode().into(),
};

voucher.sign(&domain, self.payer.clone()).unwrap()
voucher.sign(&domain, signer).unwrap()
}

pub fn test_voucher(
&self,
metadata: SubgraphIndexingVoucherMetadata,
) -> SignedIndexingAgreementVoucher {
self.test_voucher_with_signer(metadata, self.payer.clone())
}
}

#[tokio::test]
async fn test_create_and_cancel_agreement() -> anyhow::Result<()> {
let ctx = DipsServerContext::for_testing();
let voucher_ctx = VoucherContext::random();
let store = Arc::new(InMemoryAgreementStore::default());

// Create metadata and voucher
let metadata = SubgraphIndexingVoucherMetadata {
Expand All @@ -649,13 +682,11 @@ mod test {

// Create agreement
let agreement_id = super::validate_and_create_agreement(
store.clone(),
ctx.clone(),
&voucher_ctx.domain(),
&voucher_ctx.payee.address(),
vec![voucher_ctx.payer.address()],
signed_voucher.encode_vec(),
&PriceCalculator::for_testing(),
Arc::new(TestIpfsClient::mainnet()),
)
.await?;

Expand All @@ -668,7 +699,7 @@ mod test {

// Cancel agreement
let cancelled_id = super::validate_and_cancel_agreement(
store.clone(),
ctx.store.clone(),
&cancel_domain,
signed_cancel.encode_vec(),
)
Expand All @@ -677,7 +708,7 @@ mod test {
assert_eq!(agreement_id, cancelled_id);

// Verify agreement is cancelled
let stored_agreement = store.get_by_id(agreement_id).await?.unwrap();
let stored_agreement = ctx.store.get_by_id(agreement_id).await?.unwrap();
assert!(stored_agreement.cancelled);

Ok(())
Expand All @@ -686,7 +717,14 @@ mod test {
#[tokio::test]
async fn test_create_validations_errors() -> anyhow::Result<()> {
let voucher_ctx = VoucherContext::random();
let store = Arc::new(InMemoryAgreementStore::default());
let ctx = DipsServerContext::for_testing_mocked_accounts(EscrowAccounts::new(
HashMap::default(),
HashMap::from_iter(vec![(
voucher_ctx.payer.address(),
vec![voucher_ctx.payer.address()],
)]),
))
.await;

let metadata = SubgraphIndexingVoucherMetadata {
basePricePerEpoch: U256::from(10000_u64),
Expand Down Expand Up @@ -716,6 +754,9 @@ mod test {
subgraphDeploymentId: voucher_ctx.deployment_id.clone(),
};

let signer = PrivateKeySigner::random();
let valid_voucher_invalid_signer =
voucher_ctx.test_voucher_with_signer(metadata.clone(), signer.clone());
let valid_voucher = voucher_ctx.test_voucher(metadata);

let expected_result: Vec<Result<[u8; 16], DipsError>> = vec![
Expand All @@ -728,23 +769,27 @@ mod test {
100,
"10".to_string(),
)),
Err(DipsError::SignerNotAuthorised(signer.address())),
Ok(valid_voucher
.voucher
.agreement_id
.as_slice()
.try_into()
.unwrap()),
];
let cases = vec![wrong_network_voucher, low_price_voucher, valid_voucher];
let cases = vec![
wrong_network_voucher,
low_price_voucher,
valid_voucher_invalid_signer,
valid_voucher,
];
for (voucher, result) in cases.into_iter().zip(expected_result.into_iter()) {
let out = super::validate_and_create_agreement(
store.clone(),
ctx.clone(),
&voucher_ctx.domain(),
&voucher_ctx.payee.address(),
vec![voucher_ctx.payer.address()],
voucher.encode_vec(),
&PriceCalculator::for_testing(),
Arc::new(TestIpfsClient::mainnet()),
)
.await;

Expand Down
Loading
Loading