From 5ca1bf8654a35dd74ef3869abf4cab145d388582 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 29 Jan 2025 20:10:55 +0100 Subject: [PATCH 1/4] refactor!: add v2 proto refactor!: move grpc structs to v1 refactor: create shared uint128 proto Signed-off-by: Gustavo Inacio --- tap_aggregator/Cargo.toml | 2 +- tap_aggregator/build.rs | 10 +- tap_aggregator/proto/tap_aggregator.proto | 13 +- tap_aggregator/proto/uint128.proto | 12 ++ tap_aggregator/proto/v2.proto | 51 ++++++ tap_aggregator/src/grpc.rs | 214 +++++++++++----------- tap_aggregator/src/server.rs | 2 +- tap_aggregator/tests/aggregate_test.rs | 2 +- 8 files changed, 191 insertions(+), 115 deletions(-) create mode 100644 tap_aggregator/proto/uint128.proto create mode 100644 tap_aggregator/proto/v2.proto diff --git a/tap_aggregator/Cargo.toml b/tap_aggregator/Cargo.toml index 225ccf01..37300161 100644 --- a/tap_aggregator/Cargo.toml +++ b/tap_aggregator/Cargo.toml @@ -39,7 +39,7 @@ tokio.workspace = true tonic = { version = "0.12.3", features = ["transport", "zstd"] } tower = { version = "0.5.2", features = ["util", "steer"] } tracing-subscriber = "0.3.17" -tap_graph = { version = "0.2.0", path = "../tap_graph" } +tap_graph = { version = "0.2.0", path = "../tap_graph", features = ["v2"] } [build-dependencies] tonic-build = "0.12.3" diff --git a/tap_aggregator/build.rs b/tap_aggregator/build.rs index 9e7a9860..9a963780 100644 --- a/tap_aggregator/build.rs +++ b/tap_aggregator/build.rs @@ -6,6 +6,14 @@ fn main() -> Result<(), Box> { let out_dir = std::env::var("OUT_DIR").expect("OUT_DIR not set by Cargo"); println!("OUT_DIR: {}", out_dir); // This should print the output directory - tonic_build::compile_protos("./proto/tap_aggregator.proto")?; + tonic_build::configure().compile_protos( + &[ + "proto/uint128.proto", + "proto/tap_aggregator.proto", + "proto/v2.proto", + ], + &["proto"], + )?; + Ok(()) } diff --git a/tap_aggregator/proto/tap_aggregator.proto b/tap_aggregator/proto/tap_aggregator.proto index 42679369..2fed65d1 100644 --- a/tap_aggregator/proto/tap_aggregator.proto +++ b/tap_aggregator/proto/tap_aggregator.proto @@ -4,11 +4,13 @@ syntax = "proto3"; package tap_aggregator.v1; +import "uint128.proto"; + message Receipt { bytes allocation_id = 1; uint64 timestamp_ns = 2; uint64 nonce = 3; - Uint128 value = 4; + grpc.uint128.Uint128 value = 4; } message SignedReceipt { @@ -19,7 +21,7 @@ message SignedReceipt { message ReceiptAggregateVoucher { bytes allocation_id = 1; uint64 timestamp_ns = 2; - Uint128 value_aggregate = 3; + grpc.uint128.Uint128 value_aggregate = 3; } message SignedRav { @@ -39,10 +41,3 @@ message RavResponse { service TapAggregator { rpc AggregateReceipts(RavRequest) returns (RavResponse); } - -message Uint128 { - // Highest 64 bits of a 128 bit number. - uint64 high = 1; - // Lowest 64 bits of a 128 bit number. - uint64 low = 2; -} diff --git a/tap_aggregator/proto/uint128.proto b/tap_aggregator/proto/uint128.proto new file mode 100644 index 00000000..0c73aec0 --- /dev/null +++ b/tap_aggregator/proto/uint128.proto @@ -0,0 +1,12 @@ +// Copyright 2023-, Semiotic AI, Inc. +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; +package grpc.uint128; + +message Uint128 { + // Highest 64 bits of a 128 bit number. + uint64 high = 1; + // Lowest 64 bits of a 128 bit number. + uint64 low = 2; +} diff --git a/tap_aggregator/proto/v2.proto b/tap_aggregator/proto/v2.proto new file mode 100644 index 00000000..86d11147 --- /dev/null +++ b/tap_aggregator/proto/v2.proto @@ -0,0 +1,51 @@ +// Copyright 2023-, Semiotic AI, Inc. +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; +package tap_aggregator.v2; + +import "uint128.proto"; + +message Receipt { + bytes allocation_id = 1; + bytes payer = 2; + bytes data_service = 3; + bytes service_provider = 4; + uint64 timestamp_ns = 5; + uint64 nonce = 6; + grpc.uint128.Uint128 value = 7; +} + +message SignedReceipt { + Receipt message = 1; + bytes signature = 2; +} + +message ReceiptAggregateVoucher { + bytes allocation_id = 1; + bytes payer = 2; + bytes data_service = 3; + bytes service_provider = 4; + uint64 timestamp_ns = 5; + grpc.uint128.Uint128 value_aggregate = 6; + bytes metadata = 7; +} + +message SignedRav { + ReceiptAggregateVoucher message = 1; + bytes signature = 2; +} + +message RavRequest { + repeated SignedReceipt receipts = 1; + optional SignedRav previous_rav = 2; +} + +message RavResponse { + SignedRav rav = 1; +} + +service TapAggregator { + rpc AggregateReceipts(RavRequest) returns (RavResponse); +} + diff --git a/tap_aggregator/src/grpc.rs b/tap_aggregator/src/grpc.rs index 753e23af..3922e882 100644 --- a/tap_aggregator/src/grpc.rs +++ b/tap_aggregator/src/grpc.rs @@ -1,135 +1,145 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use anyhow::anyhow; -use tap_core::signed_message::Eip712SignedMessage; - -tonic::include_proto!("tap_aggregator.v1"); - -impl TryFrom for tap_graph::Receipt { - type Error = anyhow::Error; - fn try_from(receipt: Receipt) -> Result { - Ok(Self { - allocation_id: receipt.allocation_id.as_slice().try_into()?, - timestamp_ns: receipt.timestamp_ns, - value: receipt.value.ok_or(anyhow!("Missing value"))?.into(), - nonce: receipt.nonce, - }) +pub mod uint128 { + tonic::include_proto!("grpc.uint128"); + + impl From for u128 { + fn from(Uint128 { high, low }: Uint128) -> Self { + ((high as u128) << 64) | low as u128 + } } -} -impl TryFrom for tap_graph::SignedReceipt { - type Error = anyhow::Error; - fn try_from(receipt: SignedReceipt) -> Result { - Ok(Self { - signature: receipt.signature.as_slice().try_into()?, - message: receipt - .message - .ok_or(anyhow!("Missing message"))? - .try_into()?, - }) + impl From for Uint128 { + fn from(value: u128) -> Self { + let high = (value >> 64) as u64; + let low = value as u64; + Self { high, low } + } } } -impl From for Receipt { - fn from(value: tap_graph::Receipt) -> Self { - Self { - allocation_id: value.allocation_id.as_slice().to_vec(), - timestamp_ns: value.timestamp_ns, - nonce: value.nonce, - value: Some(value.value.into()), +pub mod v1 { + use anyhow::anyhow; + use tap_core::signed_message::Eip712SignedMessage; + + tonic::include_proto!("tap_aggregator.v1"); + + impl TryFrom for tap_graph::Receipt { + type Error = anyhow::Error; + fn try_from(receipt: self::Receipt) -> Result { + Ok(Self { + allocation_id: receipt.allocation_id.as_slice().try_into()?, + timestamp_ns: receipt.timestamp_ns, + value: receipt.value.ok_or(anyhow!("Missing value"))?.into(), + nonce: receipt.nonce, + }) } } -} -impl From for SignedReceipt { - fn from(value: tap_graph::SignedReceipt) -> Self { - Self { - message: Some(value.message.into()), - signature: value.signature.as_bytes().to_vec(), + impl TryFrom for tap_graph::SignedReceipt { + type Error = anyhow::Error; + fn try_from(receipt: self::SignedReceipt) -> Result { + Ok(Self { + signature: receipt.signature.as_slice().try_into()?, + message: receipt + .message + .ok_or(anyhow!("Missing message"))? + .try_into()?, + }) } } -} -impl TryFrom for Eip712SignedMessage { - type Error = anyhow::Error; - fn try_from(voucher: SignedRav) -> Result { - Ok(Self { - signature: voucher.signature.as_slice().try_into()?, - message: voucher - .message - .ok_or(anyhow!("Missing message"))? - .try_into()?, - }) + impl From for self::Receipt { + fn from(value: tap_graph::Receipt) -> Self { + Self { + allocation_id: value.allocation_id.as_slice().to_vec(), + timestamp_ns: value.timestamp_ns, + nonce: value.nonce, + value: Some(value.value.into()), + } + } } -} -impl From> for SignedRav { - fn from(voucher: Eip712SignedMessage) -> Self { - Self { - signature: voucher.signature.as_bytes().to_vec(), - message: Some(voucher.message.into()), + impl From for self::SignedReceipt { + fn from(value: tap_graph::SignedReceipt) -> Self { + Self { + message: Some(value.message.into()), + signature: value.signature.as_bytes().to_vec(), + } } } -} -impl TryFrom for tap_graph::ReceiptAggregateVoucher { - type Error = anyhow::Error; - fn try_from(voucher: ReceiptAggregateVoucher) -> Result { - Ok(Self { - allocationId: voucher.allocation_id.as_slice().try_into()?, - timestampNs: voucher.timestamp_ns, - valueAggregate: voucher - .value_aggregate - .ok_or(anyhow!("Missing Value Aggregate"))? - .into(), - }) + impl TryFrom for Eip712SignedMessage { + type Error = anyhow::Error; + fn try_from(voucher: self::SignedRav) -> Result { + Ok(Self { + signature: voucher.signature.as_slice().try_into()?, + message: voucher + .message + .ok_or(anyhow!("Missing message"))? + .try_into()?, + }) + } } -} -impl From for ReceiptAggregateVoucher { - fn from(voucher: tap_graph::ReceiptAggregateVoucher) -> Self { - Self { - allocation_id: voucher.allocationId.to_vec(), - timestamp_ns: voucher.timestampNs, - value_aggregate: Some(voucher.valueAggregate.into()), + impl From> for self::SignedRav { + fn from(voucher: Eip712SignedMessage) -> Self { + Self { + signature: voucher.signature.as_bytes().to_vec(), + message: Some(voucher.message.into()), + } } } -} -impl From for u128 { - fn from(Uint128 { high, low }: Uint128) -> Self { - ((high as u128) << 64) | low as u128 + impl TryFrom for tap_graph::ReceiptAggregateVoucher { + type Error = anyhow::Error; + fn try_from(voucher: self::ReceiptAggregateVoucher) -> Result { + Ok(Self { + allocationId: voucher.allocation_id.as_slice().try_into()?, + timestampNs: voucher.timestamp_ns, + valueAggregate: voucher + .value_aggregate + .ok_or(anyhow!("Missing Value Aggregate"))? + .into(), + }) + } } -} -impl From for Uint128 { - fn from(value: u128) -> Self { - let high = (value >> 64) as u64; - let low = value as u64; - Self { high, low } + impl From for self::ReceiptAggregateVoucher { + fn from(voucher: tap_graph::ReceiptAggregateVoucher) -> Self { + Self { + allocation_id: voucher.allocationId.to_vec(), + timestamp_ns: voucher.timestampNs, + value_aggregate: Some(voucher.valueAggregate.into()), + } + } } -} -impl RavRequest { - pub fn new( - receipts: Vec, - previous_rav: Option, - ) -> Self { - Self { - receipts: receipts.into_iter().map(Into::into).collect(), - previous_rav: previous_rav.map(Into::into), + impl self::RavRequest { + pub fn new( + receipts: Vec, + previous_rav: Option, + ) -> Self { + Self { + receipts: receipts.into_iter().map(Into::into).collect(), + previous_rav: previous_rav.map(Into::into), + } } } -} -impl RavResponse { - pub fn signed_rav(mut self) -> anyhow::Result { - let signed_rav: tap_graph::SignedRav = self - .rav - .take() - .ok_or(anyhow!("Couldn't find rav"))? - .try_into()?; - Ok(signed_rav) + impl self::RavResponse { + pub fn signed_rav(mut self) -> anyhow::Result { + let signed_rav: tap_graph::SignedRav = self + .rav + .take() + .ok_or(anyhow!("Couldn't find rav"))? + .try_into()?; + Ok(signed_rav) + } } } + +pub mod v2 { + tonic::include_proto!("tap_aggregator.v2"); +} diff --git a/tap_aggregator/src/server.rs b/tap_aggregator/src/server.rs index 30e2078f..4d0b771c 100644 --- a/tap_aggregator/src/server.rs +++ b/tap_aggregator/src/server.rs @@ -27,7 +27,7 @@ use crate::{ TAP_RPC_API_VERSIONS_DEPRECATED, }, error_codes::{JsonRpcErrorCode, JsonRpcWarningCode}, - grpc::{ + grpc::v1::{ tap_aggregator_server::{TapAggregator, TapAggregatorServer}, RavRequest, RavResponse, }, diff --git a/tap_aggregator/tests/aggregate_test.rs b/tap_aggregator/tests/aggregate_test.rs index e4302e55..b53fb01a 100644 --- a/tap_aggregator/tests/aggregate_test.rs +++ b/tap_aggregator/tests/aggregate_test.rs @@ -6,7 +6,7 @@ use std::{collections::HashSet, str::FromStr}; use alloy::{primitives::Address, signers::local::PrivateKeySigner}; use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; use tap_aggregator::{ - grpc::{tap_aggregator_client::TapAggregatorClient, RavRequest}, + grpc::v1::{tap_aggregator_client::TapAggregatorClient, RavRequest}, jsonrpsee_helpers::JsonRpcResponse, server, }; From 84c065c45e330c82c357d2521f8b3e3a33f94d2f Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 29 Jan 2025 20:11:04 +0100 Subject: [PATCH 2/4] chore: add justfile Signed-off-by: Gustavo Inacio --- justfile | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 justfile diff --git a/justfile b/justfile new file mode 100644 index 00000000..d9e9ace7 --- /dev/null +++ b/justfile @@ -0,0 +1,11 @@ +fmt: + cargo +nightly fmt + +clippy: + cargo +nightly clippy --all-targets --all-features + +build: + cargo build + +test: + cargo nextest run From 525f42f059d01b21040f292e42070cdbf17b9bc3 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 29 Jan 2025 21:38:37 +0100 Subject: [PATCH 3/4] feat: add tap v2 aggregator Signed-off-by: Gustavo Inacio --- tap_aggregator/src/aggregator.rs | 349 +-------------------- tap_aggregator/src/aggregator/v1.rs | 350 +++++++++++++++++++++ tap_aggregator/src/aggregator/v2.rs | 462 ++++++++++++++++++++++++++++ tap_aggregator/src/grpc.rs | 132 ++++++++ tap_aggregator/src/server.rs | 74 ++++- 5 files changed, 1008 insertions(+), 359 deletions(-) create mode 100644 tap_aggregator/src/aggregator/v1.rs create mode 100644 tap_aggregator/src/aggregator/v2.rs diff --git a/tap_aggregator/src/aggregator.rs b/tap_aggregator/src/aggregator.rs index 4026f063..81616731 100644 --- a/tap_aggregator/src/aggregator.rs +++ b/tap_aggregator/src/aggregator.rs @@ -1,350 +1,5 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::collections::{hash_set, HashSet}; - -use alloy::{ - dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner, - sol_types::SolStruct, -}; -use anyhow::{bail, Ok, Result}; -use rayon::prelude::*; -use tap_core::signed_message::{Eip712SignedMessage, SignatureBytes, SignatureBytesExt}; -use tap_graph::{Receipt, ReceiptAggregateVoucher}; - -pub fn check_and_aggregate_receipts( - domain_separator: &Eip712Domain, - receipts: &[Eip712SignedMessage], - previous_rav: Option>, - wallet: &PrivateKeySigner, - accepted_addresses: &HashSet
, -) -> Result> { - check_signatures_unique(receipts)?; - - // Check that the receipts are signed by an accepted signer address - receipts.par_iter().try_for_each(|receipt| { - check_signature_is_from_one_of_addresses(receipt, domain_separator, accepted_addresses) - })?; - - // Check that the previous rav is signed by an accepted signer address - if let Some(previous_rav) = &previous_rav { - check_signature_is_from_one_of_addresses( - previous_rav, - domain_separator, - accepted_addresses, - )?; - } - - // Check that the receipts timestamp is greater than the previous rav - check_receipt_timestamps(receipts, previous_rav.as_ref())?; - - // Get the allocation id from the first receipt, return error if there are no receipts - let allocation_id = match receipts.first() { - Some(receipt) => receipt.message.allocation_id, - None => return Err(tap_core::Error::NoValidReceiptsForRavRequest.into()), - }; - - // Check that the receipts all have the same allocation id - check_allocation_id(receipts, allocation_id)?; - - // Check that the rav has the correct allocation id - if let Some(previous_rav) = &previous_rav { - let prev_id = previous_rav.message.allocationId; - if prev_id != allocation_id { - return Err(tap_core::Error::RavAllocationIdMismatch { - prev_id: format!("{prev_id:#X}"), - new_id: format!("{allocation_id:#X}"), - } - .into()); - } - } - - // Aggregate the receipts - let rav = ReceiptAggregateVoucher::aggregate_receipts(allocation_id, receipts, previous_rav)?; - - // Sign the rav and return - Ok(Eip712SignedMessage::new(domain_separator, rav, wallet)?) -} - -fn check_signature_is_from_one_of_addresses( - message: &Eip712SignedMessage, - domain_separator: &Eip712Domain, - accepted_addresses: &HashSet
, -) -> Result<()> { - let recovered_address = message.recover_signer(domain_separator)?; - if !accepted_addresses.contains(&recovered_address) { - bail!(tap_core::Error::InvalidRecoveredSigner { - address: recovered_address, - }); - } - Ok(()) -} - -fn check_allocation_id( - receipts: &[Eip712SignedMessage], - allocation_id: Address, -) -> Result<()> { - for receipt in receipts.iter() { - let receipt = &receipt.message; - if receipt.allocation_id != allocation_id { - return Err(tap_core::Error::RavAllocationIdNotUniform.into()); - } - } - Ok(()) -} - -fn check_signatures_unique(receipts: &[Eip712SignedMessage]) -> Result<()> { - let mut receipt_signatures: hash_set::HashSet = hash_set::HashSet::new(); - for receipt in receipts.iter() { - let signature = receipt.signature.get_signature_bytes(); - if !receipt_signatures.insert(signature) { - return Err(tap_core::Error::DuplicateReceiptSignature(format!( - "{:?}", - receipt.signature - )) - .into()); - } - } - Ok(()) -} - -fn check_receipt_timestamps( - receipts: &[Eip712SignedMessage], - previous_rav: Option<&Eip712SignedMessage>, -) -> Result<()> { - if let Some(previous_rav) = &previous_rav { - for receipt in receipts.iter() { - let receipt = &receipt.message; - if previous_rav.message.timestampNs >= receipt.timestamp_ns { - return Err(tap_core::Error::ReceiptTimestampLowerThanRav { - rav_ts: previous_rav.message.timestampNs, - receipt_ts: receipt.timestamp_ns, - } - .into()); - } - } - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner}; - use rstest::*; - use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; - use tap_graph::{Receipt, ReceiptAggregateVoucher}; - - use crate::aggregator; - - #[fixture] - fn keys() -> (PrivateKeySigner, Address) { - let wallet = PrivateKeySigner::random(); - let address = wallet.address(); - (wallet, address) - } - - #[fixture] - fn allocation_ids() -> Vec
{ - vec![ - Address::from_str("0xabababababababababababababababababababab").unwrap(), - Address::from_str("0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead").unwrap(), - Address::from_str("0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef").unwrap(), - Address::from_str("0x1234567890abcdef1234567890abcdef12345678").unwrap(), - ] - } - - #[fixture] - fn domain_separator() -> Eip712Domain { - tap_eip712_domain(1, Address::from([0x11u8; 20])) - } - - #[rstest] - #[test] - fn check_signatures_unique_fail( - keys: (PrivateKeySigner, Address), - allocation_ids: Vec
, - domain_separator: Eip712Domain, - ) { - // Create the same receipt twice (replay attack) - let mut receipts = Vec::new(); - let receipt = Eip712SignedMessage::new( - &domain_separator, - Receipt::new(allocation_ids[0], 42).unwrap(), - &keys.0, - ) - .unwrap(); - receipts.push(receipt.clone()); - receipts.push(receipt); - - let res = aggregator::check_signatures_unique(&receipts); - assert!(res.is_err()); - } - - #[rstest] - #[test] - fn check_signatures_unique_ok( - keys: (PrivateKeySigner, Address), - allocation_ids: Vec
, - domain_separator: Eip712Domain, - ) { - // Create 2 different receipts - let receipts = vec![ - Eip712SignedMessage::new( - &domain_separator, - Receipt::new(allocation_ids[0], 42).unwrap(), - &keys.0, - ) - .unwrap(), - Eip712SignedMessage::new( - &domain_separator, - Receipt::new(allocation_ids[0], 43).unwrap(), - &keys.0, - ) - .unwrap(), - ]; - - let res = aggregator::check_signatures_unique(&receipts); - assert!(res.is_ok()); - } - - #[rstest] - #[test] - /// Test that a receipt with a timestamp greater then the rav timestamp passes - fn check_receipt_timestamps( - keys: (PrivateKeySigner, Address), - allocation_ids: Vec
, - domain_separator: Eip712Domain, - ) { - // Create receipts with consecutive timestamps - let receipt_timestamp_range = 10..20; - let mut receipts = Vec::new(); - for i in receipt_timestamp_range.clone() { - receipts.push( - Eip712SignedMessage::new( - &domain_separator, - Receipt { - allocation_id: allocation_ids[0], - timestamp_ns: i, - nonce: 0, - value: 42, - }, - &keys.0, - ) - .unwrap(), - ); - } - - // Create rav with max_timestamp below the receipts timestamps - let rav = Eip712SignedMessage::new( - &domain_separator, - ReceiptAggregateVoucher { - allocationId: allocation_ids[0], - timestampNs: receipt_timestamp_range.clone().min().unwrap() - 1, - valueAggregate: 42, - }, - &keys.0, - ) - .unwrap(); - assert!(aggregator::check_receipt_timestamps(&receipts, Some(&rav)).is_ok()); - - // Create rav with max_timestamp equal to the lowest receipt timestamp - // Aggregation should fail - let rav = Eip712SignedMessage::new( - &domain_separator, - ReceiptAggregateVoucher { - allocationId: allocation_ids[0], - timestampNs: receipt_timestamp_range.clone().min().unwrap(), - valueAggregate: 42, - }, - &keys.0, - ) - .unwrap(); - assert!(aggregator::check_receipt_timestamps(&receipts, Some(&rav)).is_err()); - - // Create rav with max_timestamp above highest receipt timestamp - // Aggregation should fail - let rav = Eip712SignedMessage::new( - &domain_separator, - ReceiptAggregateVoucher { - allocationId: allocation_ids[0], - timestampNs: receipt_timestamp_range.clone().max().unwrap() + 1, - valueAggregate: 42, - }, - &keys.0, - ) - .unwrap(); - assert!(aggregator::check_receipt_timestamps(&receipts, Some(&rav)).is_err()); - } - - #[rstest] - #[test] - /// Test check_allocation_id with 2 receipts that have the correct allocation id - /// and 1 receipt that has the wrong allocation id - fn check_allocation_id_fail( - keys: (PrivateKeySigner, Address), - allocation_ids: Vec
, - domain_separator: Eip712Domain, - ) { - let receipts = vec![ - Eip712SignedMessage::new( - &domain_separator, - Receipt::new(allocation_ids[0], 42).unwrap(), - &keys.0, - ) - .unwrap(), - Eip712SignedMessage::new( - &domain_separator, - Receipt::new(allocation_ids[0], 43).unwrap(), - &keys.0, - ) - .unwrap(), - Eip712SignedMessage::new( - &domain_separator, - Receipt::new(allocation_ids[1], 44).unwrap(), - &keys.0, - ) - .unwrap(), - ]; - - let res = aggregator::check_allocation_id(&receipts, allocation_ids[0]); - - assert!(res.is_err()); - } - - #[rstest] - #[test] - /// Test check_allocation_id with 3 receipts that have the correct allocation id - fn check_allocation_id_ok( - keys: (PrivateKeySigner, Address), - allocation_ids: Vec
, - domain_separator: Eip712Domain, - ) { - let receipts = vec![ - Eip712SignedMessage::new( - &domain_separator, - Receipt::new(allocation_ids[0], 42).unwrap(), - &keys.0, - ) - .unwrap(), - Eip712SignedMessage::new( - &domain_separator, - Receipt::new(allocation_ids[0], 43).unwrap(), - &keys.0, - ) - .unwrap(), - Eip712SignedMessage::new( - &domain_separator, - Receipt::new(allocation_ids[0], 44).unwrap(), - &keys.0, - ) - .unwrap(), - ]; - - let res = aggregator::check_allocation_id(&receipts, allocation_ids[0]); - - assert!(res.is_ok()); - } -} +pub mod v1; +pub mod v2; diff --git a/tap_aggregator/src/aggregator/v1.rs b/tap_aggregator/src/aggregator/v1.rs new file mode 100644 index 00000000..a82d2b5c --- /dev/null +++ b/tap_aggregator/src/aggregator/v1.rs @@ -0,0 +1,350 @@ +// Copyright 2023-, Semiotic AI, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::{hash_set, HashSet}; + +use alloy::{ + dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner, + sol_types::SolStruct, +}; +use anyhow::{bail, Ok, Result}; +use rayon::prelude::*; +use tap_core::signed_message::{Eip712SignedMessage, SignatureBytes, SignatureBytesExt}; +use tap_graph::{Receipt, ReceiptAggregateVoucher}; + +pub fn check_and_aggregate_receipts( + domain_separator: &Eip712Domain, + receipts: &[Eip712SignedMessage], + previous_rav: Option>, + wallet: &PrivateKeySigner, + accepted_addresses: &HashSet
, +) -> Result> { + check_signatures_unique(receipts)?; + + // Check that the receipts are signed by an accepted signer address + receipts.par_iter().try_for_each(|receipt| { + check_signature_is_from_one_of_addresses(receipt, domain_separator, accepted_addresses) + })?; + + // Check that the previous rav is signed by an accepted signer address + if let Some(previous_rav) = &previous_rav { + check_signature_is_from_one_of_addresses( + previous_rav, + domain_separator, + accepted_addresses, + )?; + } + + // Check that the receipts timestamp is greater than the previous rav + check_receipt_timestamps(receipts, previous_rav.as_ref())?; + + // Get the allocation id from the first receipt, return error if there are no receipts + let allocation_id = match receipts.first() { + Some(receipt) => receipt.message.allocation_id, + None => return Err(tap_core::Error::NoValidReceiptsForRavRequest.into()), + }; + + // Check that the receipts all have the same allocation id + check_allocation_id(receipts, allocation_id)?; + + // Check that the rav has the correct allocation id + if let Some(previous_rav) = &previous_rav { + let prev_id = previous_rav.message.allocationId; + if prev_id != allocation_id { + return Err(tap_core::Error::RavAllocationIdMismatch { + prev_id: format!("{prev_id:#X}"), + new_id: format!("{allocation_id:#X}"), + } + .into()); + } + } + + // Aggregate the receipts + let rav = ReceiptAggregateVoucher::aggregate_receipts(allocation_id, receipts, previous_rav)?; + + // Sign the rav and return + Ok(Eip712SignedMessage::new(domain_separator, rav, wallet)?) +} + +fn check_signature_is_from_one_of_addresses( + message: &Eip712SignedMessage, + domain_separator: &Eip712Domain, + accepted_addresses: &HashSet
, +) -> Result<()> { + let recovered_address = message.recover_signer(domain_separator)?; + if !accepted_addresses.contains(&recovered_address) { + bail!(tap_core::Error::InvalidRecoveredSigner { + address: recovered_address, + }); + } + Ok(()) +} + +fn check_allocation_id( + receipts: &[Eip712SignedMessage], + allocation_id: Address, +) -> Result<()> { + for receipt in receipts.iter() { + let receipt = &receipt.message; + if receipt.allocation_id != allocation_id { + return Err(tap_core::Error::RavAllocationIdNotUniform.into()); + } + } + Ok(()) +} + +fn check_signatures_unique(receipts: &[Eip712SignedMessage]) -> Result<()> { + let mut receipt_signatures: hash_set::HashSet = hash_set::HashSet::new(); + for receipt in receipts.iter() { + let signature = receipt.signature.get_signature_bytes(); + if !receipt_signatures.insert(signature) { + return Err(tap_core::Error::DuplicateReceiptSignature(format!( + "{:?}", + receipt.signature + )) + .into()); + } + } + Ok(()) +} + +fn check_receipt_timestamps( + receipts: &[Eip712SignedMessage], + previous_rav: Option<&Eip712SignedMessage>, +) -> Result<()> { + if let Some(previous_rav) = &previous_rav { + for receipt in receipts.iter() { + let receipt = &receipt.message; + if previous_rav.message.timestampNs >= receipt.timestamp_ns { + return Err(tap_core::Error::ReceiptTimestampLowerThanRav { + rav_ts: previous_rav.message.timestampNs, + receipt_ts: receipt.timestamp_ns, + } + .into()); + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner}; + use rstest::*; + use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; + use tap_graph::{Receipt, ReceiptAggregateVoucher}; + + use super::*; + + #[fixture] + fn keys() -> (PrivateKeySigner, Address) { + let wallet = PrivateKeySigner::random(); + let address = wallet.address(); + (wallet, address) + } + + #[fixture] + fn allocation_ids() -> Vec
{ + vec![ + Address::from_str("0xabababababababababababababababababababab").unwrap(), + Address::from_str("0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead").unwrap(), + Address::from_str("0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef").unwrap(), + Address::from_str("0x1234567890abcdef1234567890abcdef12345678").unwrap(), + ] + } + + #[fixture] + fn domain_separator() -> Eip712Domain { + tap_eip712_domain(1, Address::from([0x11u8; 20])) + } + + #[rstest] + #[test] + fn check_signatures_unique_fail( + keys: (PrivateKeySigner, Address), + allocation_ids: Vec
, + domain_separator: Eip712Domain, + ) { + // Create the same receipt twice (replay attack) + let mut receipts = Vec::new(); + let receipt = Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_ids[0], 42).unwrap(), + &keys.0, + ) + .unwrap(); + receipts.push(receipt.clone()); + receipts.push(receipt); + + let res = check_signatures_unique(&receipts); + assert!(res.is_err()); + } + + #[rstest] + #[test] + fn check_signatures_unique_ok( + keys: (PrivateKeySigner, Address), + allocation_ids: Vec
, + domain_separator: Eip712Domain, + ) { + // Create 2 different receipts + let receipts = vec![ + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_ids[0], 42).unwrap(), + &keys.0, + ) + .unwrap(), + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_ids[0], 43).unwrap(), + &keys.0, + ) + .unwrap(), + ]; + + let res = check_signatures_unique(&receipts); + assert!(res.is_ok()); + } + + #[rstest] + #[test] + /// Test that a receipt with a timestamp greater then the rav timestamp passes + fn test_check_receipt_timestamps( + keys: (PrivateKeySigner, Address), + allocation_ids: Vec
, + domain_separator: Eip712Domain, + ) { + // Create receipts with consecutive timestamps + let receipt_timestamp_range = 10..20; + let mut receipts = Vec::new(); + for i in receipt_timestamp_range.clone() { + receipts.push( + Eip712SignedMessage::new( + &domain_separator, + Receipt { + allocation_id: allocation_ids[0], + timestamp_ns: i, + nonce: 0, + value: 42, + }, + &keys.0, + ) + .unwrap(), + ); + } + + // Create rav with max_timestamp below the receipts timestamps + let rav = Eip712SignedMessage::new( + &domain_separator, + ReceiptAggregateVoucher { + allocationId: allocation_ids[0], + timestampNs: receipt_timestamp_range.clone().min().unwrap() - 1, + valueAggregate: 42, + }, + &keys.0, + ) + .unwrap(); + assert!(check_receipt_timestamps(&receipts, Some(&rav)).is_ok()); + + // Create rav with max_timestamp equal to the lowest receipt timestamp + // Aggregation should fail + let rav = Eip712SignedMessage::new( + &domain_separator, + ReceiptAggregateVoucher { + allocationId: allocation_ids[0], + timestampNs: receipt_timestamp_range.clone().min().unwrap(), + valueAggregate: 42, + }, + &keys.0, + ) + .unwrap(); + assert!(check_receipt_timestamps(&receipts, Some(&rav)).is_err()); + + // Create rav with max_timestamp above highest receipt timestamp + // Aggregation should fail + let rav = Eip712SignedMessage::new( + &domain_separator, + ReceiptAggregateVoucher { + allocationId: allocation_ids[0], + timestampNs: receipt_timestamp_range.clone().max().unwrap() + 1, + valueAggregate: 42, + }, + &keys.0, + ) + .unwrap(); + assert!(check_receipt_timestamps(&receipts, Some(&rav)).is_err()); + } + + #[rstest] + #[test] + /// Test check_allocation_id with 2 receipts that have the correct allocation id + /// and 1 receipt that has the wrong allocation id + fn check_allocation_id_fail( + keys: (PrivateKeySigner, Address), + allocation_ids: Vec
, + domain_separator: Eip712Domain, + ) { + let receipts = vec![ + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_ids[0], 42).unwrap(), + &keys.0, + ) + .unwrap(), + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_ids[0], 43).unwrap(), + &keys.0, + ) + .unwrap(), + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_ids[1], 44).unwrap(), + &keys.0, + ) + .unwrap(), + ]; + + let res = check_allocation_id(&receipts, allocation_ids[0]); + + assert!(res.is_err()); + } + + #[rstest] + #[test] + /// Test check_allocation_id with 3 receipts that have the correct allocation id + fn check_allocation_id_ok( + keys: (PrivateKeySigner, Address), + allocation_ids: Vec
, + domain_separator: Eip712Domain, + ) { + let receipts = vec![ + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_ids[0], 42).unwrap(), + &keys.0, + ) + .unwrap(), + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_ids[0], 43).unwrap(), + &keys.0, + ) + .unwrap(), + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_ids[0], 44).unwrap(), + &keys.0, + ) + .unwrap(), + ]; + + let res = check_allocation_id(&receipts, allocation_ids[0]); + + assert!(res.is_ok()); + } +} diff --git a/tap_aggregator/src/aggregator/v2.rs b/tap_aggregator/src/aggregator/v2.rs new file mode 100644 index 00000000..9f3084bd --- /dev/null +++ b/tap_aggregator/src/aggregator/v2.rs @@ -0,0 +1,462 @@ +// Copyright 2023-, Semiotic AI, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::{hash_set, HashSet}; + +use alloy::{ + dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner, + sol_types::SolStruct, +}; +use anyhow::{bail, Ok, Result}; +use rayon::prelude::*; +use tap_core::signed_message::{Eip712SignedMessage, SignatureBytes, SignatureBytesExt}; +use tap_graph::v2::{Receipt, ReceiptAggregateVoucher}; + +pub fn check_and_aggregate_receipts( + domain_separator: &Eip712Domain, + receipts: &[Eip712SignedMessage], + previous_rav: Option>, + wallet: &PrivateKeySigner, + accepted_addresses: &HashSet
, +) -> Result> { + check_signatures_unique(receipts)?; + + // Check that the receipts are signed by an accepted signer address + receipts.par_iter().try_for_each(|receipt| { + check_signature_is_from_one_of_addresses(receipt, domain_separator, accepted_addresses) + })?; + + // Check that the previous rav is signed by an accepted signer address + if let Some(previous_rav) = &previous_rav { + check_signature_is_from_one_of_addresses( + previous_rav, + domain_separator, + accepted_addresses, + )?; + } + + // Check that the receipts timestamp is greater than the previous rav + check_receipt_timestamps(receipts, previous_rav.as_ref())?; + + // Get the allocation id from the first receipt, return error if there are no receipts + let (allocation_id, payer, data_service, service_provider) = match receipts.first() { + Some(receipt) => ( + receipt.message.allocation_id, + receipt.message.payer, + receipt.message.data_service, + receipt.message.service_provider, + ), + None => return Err(tap_core::Error::NoValidReceiptsForRavRequest.into()), + }; + + // Check that the receipts all have the same allocation id + check_allocation_id( + receipts, + allocation_id, + payer, + data_service, + service_provider, + )?; + + // Check that the rav has the correct allocation id + if let Some(previous_rav) = &previous_rav { + let prev_id = previous_rav.message.allocationId; + let prev_payer = previous_rav.message.payer; + let prev_data_service = previous_rav.message.dataService; + let prev_service_provider = previous_rav.message.serviceProvider; + if prev_id != allocation_id { + return Err(tap_core::Error::RavAllocationIdMismatch { + prev_id: format!("{prev_id:#X}"), + new_id: format!("{allocation_id:#X}"), + } + .into()); + } + if prev_payer != payer { + return Err(tap_core::Error::RavAllocationIdMismatch { + prev_id: format!("{prev_id:#X}"), + new_id: format!("{allocation_id:#X}"), + } + .into()); + } + + if prev_data_service != data_service { + return Err(tap_core::Error::RavAllocationIdMismatch { + prev_id: format!("{prev_id:#X}"), + new_id: format!("{allocation_id:#X}"), + } + .into()); + } + if prev_service_provider != service_provider { + return Err(tap_core::Error::RavAllocationIdMismatch { + prev_id: format!("{prev_id:#X}"), + new_id: format!("{allocation_id:#X}"), + } + .into()); + } + } + + // Aggregate the receipts + let rav = ReceiptAggregateVoucher::aggregate_receipts( + allocation_id, + payer, + data_service, + service_provider, + receipts, + previous_rav, + )?; + + // Sign the rav and return + Ok(Eip712SignedMessage::new(domain_separator, rav, wallet)?) +} + +fn check_signature_is_from_one_of_addresses( + message: &Eip712SignedMessage, + domain_separator: &Eip712Domain, + accepted_addresses: &HashSet
, +) -> Result<()> { + let recovered_address = message.recover_signer(domain_separator)?; + if !accepted_addresses.contains(&recovered_address) { + bail!(tap_core::Error::InvalidRecoveredSigner { + address: recovered_address, + }); + } + Ok(()) +} + +fn check_allocation_id( + receipts: &[Eip712SignedMessage], + allocation_id: Address, + payer: Address, + data_service: Address, + service_provider: Address, +) -> Result<()> { + for receipt in receipts.iter() { + let receipt = &receipt.message; + if receipt.allocation_id != allocation_id { + return Err(tap_core::Error::RavAllocationIdNotUniform.into()); + } + if receipt.payer != payer { + return Err(tap_core::Error::RavAllocationIdNotUniform.into()); + } + if receipt.data_service != data_service { + return Err(tap_core::Error::RavAllocationIdNotUniform.into()); + } + if receipt.service_provider != service_provider { + return Err(tap_core::Error::RavAllocationIdNotUniform.into()); + } + } + Ok(()) +} + +fn check_signatures_unique(receipts: &[Eip712SignedMessage]) -> Result<()> { + let mut receipt_signatures: hash_set::HashSet = hash_set::HashSet::new(); + for receipt in receipts.iter() { + let signature = receipt.signature.get_signature_bytes(); + if !receipt_signatures.insert(signature) { + return Err(tap_core::Error::DuplicateReceiptSignature(format!( + "{:?}", + receipt.signature + )) + .into()); + } + } + Ok(()) +} + +fn check_receipt_timestamps( + receipts: &[Eip712SignedMessage], + previous_rav: Option<&Eip712SignedMessage>, +) -> Result<()> { + if let Some(previous_rav) = &previous_rav { + for receipt in receipts.iter() { + let receipt = &receipt.message; + if previous_rav.message.timestampNs >= receipt.timestamp_ns { + return Err(tap_core::Error::ReceiptTimestampLowerThanRav { + rav_ts: previous_rav.message.timestampNs, + receipt_ts: receipt.timestamp_ns, + } + .into()); + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use alloy::{ + dyn_abi::Eip712Domain, + primitives::{address, Address, Bytes}, + signers::local::PrivateKeySigner, + }; + use rstest::*; + use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; + use tap_graph::v2::{Receipt, ReceiptAggregateVoucher}; + + #[fixture] + fn keys() -> (PrivateKeySigner, Address) { + let wallet = PrivateKeySigner::random(); + let address = wallet.address(); + (wallet, address) + } + + #[fixture] + fn allocation_id() -> Address { + address!("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") + } + + #[fixture] + fn payer() -> Address { + address!("abababababababababababababababababababab") + } + + #[fixture] + fn data_service() -> Address { + address!("deaddeaddeaddeaddeaddeaddeaddeaddeaddead") + } + + #[fixture] + fn service_provider() -> Address { + address!("beefbeefbeefbeefbeefbeefbeefbeefbeefbeef") + } + + #[fixture] + fn other_address() -> Address { + address!("1234567890abcdef1234567890abcdef12345678") + } + #[fixture] + fn domain_separator() -> Eip712Domain { + tap_eip712_domain(1, Address::from([0x11u8; 20])) + } + + #[rstest] + #[test] + fn check_signatures_unique_fail( + keys: (PrivateKeySigner, Address), + allocation_id: Address, + payer: Address, + data_service: Address, + service_provider: Address, + domain_separator: Eip712Domain, + ) { + // Create the same receipt twice (replay attack) + let mut receipts = Vec::new(); + let receipt = Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_id, payer, data_service, service_provider, 42).unwrap(), + &keys.0, + ) + .unwrap(); + receipts.push(receipt.clone()); + receipts.push(receipt); + + let res = super::check_signatures_unique(&receipts); + assert!(res.is_err()); + } + + #[rstest] + #[test] + fn check_signatures_unique_ok( + keys: (PrivateKeySigner, Address), + allocation_id: Address, + payer: Address, + data_service: Address, + service_provider: Address, + domain_separator: Eip712Domain, + ) { + // Create 2 different receipts + let receipts = vec![ + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_id, payer, data_service, service_provider, 42).unwrap(), + &keys.0, + ) + .unwrap(), + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_id, payer, data_service, service_provider, 42).unwrap(), + &keys.0, + ) + .unwrap(), + ]; + + let res = super::check_signatures_unique(&receipts); + assert!(res.is_ok()); + } + + #[rstest] + #[test] + /// Test that a receipt with a timestamp greater then the rav timestamp passes + fn check_receipt_timestamps( + keys: (PrivateKeySigner, Address), + allocation_id: Address, + payer: Address, + data_service: Address, + service_provider: Address, + domain_separator: Eip712Domain, + ) { + // Create receipts with consecutive timestamps + let receipt_timestamp_range = 10..20; + let mut receipts = Vec::new(); + for i in receipt_timestamp_range.clone() { + receipts.push( + Eip712SignedMessage::new( + &domain_separator, + Receipt { + allocation_id, + payer, + data_service, + service_provider, + timestamp_ns: i, + nonce: 0, + value: 42, + }, + &keys.0, + ) + .unwrap(), + ); + } + + // Create rav with max_timestamp below the receipts timestamps + let rav = Eip712SignedMessage::new( + &domain_separator, + ReceiptAggregateVoucher { + allocationId: allocation_id, + dataService: data_service, + payer, + serviceProvider: service_provider, + timestampNs: receipt_timestamp_range.clone().min().unwrap() - 1, + valueAggregate: 42, + metadata: Bytes::new(), + }, + &keys.0, + ) + .unwrap(); + assert!(super::check_receipt_timestamps(&receipts, Some(&rav)).is_ok()); + + // Create rav with max_timestamp equal to the lowest receipt timestamp + // Aggregation should fail + let rav = Eip712SignedMessage::new( + &domain_separator, + ReceiptAggregateVoucher { + allocationId: allocation_id, + dataService: data_service, + payer, + serviceProvider: service_provider, + timestampNs: receipt_timestamp_range.clone().min().unwrap(), + valueAggregate: 42, + metadata: Bytes::new(), + }, + &keys.0, + ) + .unwrap(); + assert!(super::check_receipt_timestamps(&receipts, Some(&rav)).is_err()); + + // Create rav with max_timestamp above highest receipt timestamp + // Aggregation should fail + let rav = Eip712SignedMessage::new( + &domain_separator, + ReceiptAggregateVoucher { + allocationId: allocation_id, + dataService: data_service, + payer, + serviceProvider: service_provider, + timestampNs: receipt_timestamp_range.clone().max().unwrap() + 1, + valueAggregate: 42, + metadata: Bytes::new(), + }, + &keys.0, + ) + .unwrap(); + assert!(super::check_receipt_timestamps(&receipts, Some(&rav)).is_err()); + } + + #[rstest] + #[test] + /// Test check_allocation_id with 2 receipts that have the correct allocation id + /// and 1 receipt that has the wrong allocation id + fn check_allocation_id_fail( + keys: (PrivateKeySigner, Address), + allocation_id: Address, + payer: Address, + data_service: Address, + service_provider: Address, + other_address: Address, + domain_separator: Eip712Domain, + ) { + let receipts = vec![ + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_id, payer, data_service, service_provider, 42).unwrap(), + &keys.0, + ) + .unwrap(), + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_id, payer, data_service, service_provider, 43).unwrap(), + &keys.0, + ) + .unwrap(), + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(other_address, payer, data_service, service_provider, 44).unwrap(), + &keys.0, + ) + .unwrap(), + ]; + + let res = super::check_allocation_id( + &receipts, + allocation_id, + payer, + data_service, + service_provider, + ); + + assert!(res.is_err()); + } + + #[rstest] + #[test] + /// Test check_allocation_id with 3 receipts that have the correct allocation id + fn check_allocation_id_ok( + keys: (PrivateKeySigner, Address), + allocation_id: Address, + payer: Address, + data_service: Address, + service_provider: Address, + domain_separator: Eip712Domain, + ) { + let receipts = vec![ + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_id, payer, data_service, service_provider, 42).unwrap(), + &keys.0, + ) + .unwrap(), + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_id, payer, data_service, service_provider, 43).unwrap(), + &keys.0, + ) + .unwrap(), + Eip712SignedMessage::new( + &domain_separator, + Receipt::new(allocation_id, payer, data_service, service_provider, 44).unwrap(), + &keys.0, + ) + .unwrap(), + ]; + + let res = super::check_allocation_id( + &receipts, + allocation_id, + payer, + data_service, + service_provider, + ); + + assert!(res.is_ok()); + } +} diff --git a/tap_aggregator/src/grpc.rs b/tap_aggregator/src/grpc.rs index 3922e882..9e6fe584 100644 --- a/tap_aggregator/src/grpc.rs +++ b/tap_aggregator/src/grpc.rs @@ -141,5 +141,137 @@ pub mod v1 { } pub mod v2 { + use alloy::primitives::Bytes; + use anyhow::anyhow; + use tap_core::signed_message::Eip712SignedMessage; + tonic::include_proto!("tap_aggregator.v2"); + + impl TryFrom for tap_graph::v2::Receipt { + type Error = anyhow::Error; + fn try_from(receipt: self::Receipt) -> Result { + Ok(Self { + allocation_id: receipt.allocation_id.as_slice().try_into()?, + timestamp_ns: receipt.timestamp_ns, + value: receipt.value.ok_or(anyhow!("Missing value"))?.into(), + nonce: receipt.nonce, + payer: receipt.payer.as_slice().try_into()?, + data_service: receipt.data_service.as_slice().try_into()?, + service_provider: receipt.service_provider.as_slice().try_into()?, + }) + } + } + + impl TryFrom for tap_graph::v2::SignedReceipt { + type Error = anyhow::Error; + fn try_from(receipt: self::SignedReceipt) -> Result { + Ok(Self { + signature: receipt.signature.as_slice().try_into()?, + message: receipt + .message + .ok_or(anyhow!("Missing message"))? + .try_into()?, + }) + } + } + + impl From for self::Receipt { + fn from(value: tap_graph::v2::Receipt) -> Self { + Self { + allocation_id: value.allocation_id.as_slice().to_vec(), + timestamp_ns: value.timestamp_ns, + nonce: value.nonce, + value: Some(value.value.into()), + payer: value.payer.as_slice().to_vec(), + data_service: value.data_service.as_slice().to_vec(), + service_provider: value.service_provider.as_slice().to_vec(), + } + } + } + + impl From for self::SignedReceipt { + fn from(value: tap_graph::v2::SignedReceipt) -> Self { + Self { + message: Some(value.message.into()), + signature: value.signature.as_bytes().to_vec(), + } + } + } + + impl TryFrom for Eip712SignedMessage { + type Error = anyhow::Error; + fn try_from(voucher: self::SignedRav) -> Result { + Ok(Self { + signature: voucher.signature.as_slice().try_into()?, + message: voucher + .message + .ok_or(anyhow!("Missing message"))? + .try_into()?, + }) + } + } + + impl From> for self::SignedRav { + fn from(voucher: Eip712SignedMessage) -> Self { + Self { + signature: voucher.signature.as_bytes().to_vec(), + message: Some(voucher.message.into()), + } + } + } + + impl TryFrom for tap_graph::v2::ReceiptAggregateVoucher { + type Error = anyhow::Error; + fn try_from(voucher: self::ReceiptAggregateVoucher) -> Result { + Ok(Self { + allocationId: voucher.allocation_id.as_slice().try_into()?, + timestampNs: voucher.timestamp_ns, + valueAggregate: voucher + .value_aggregate + .ok_or(anyhow!("Missing Value Aggregate"))? + .into(), + payer: voucher.payer.as_slice().try_into()?, + dataService: voucher.data_service.as_slice().try_into()?, + serviceProvider: voucher.service_provider.as_slice().try_into()?, + metadata: Bytes::copy_from_slice(voucher.metadata.as_slice()), + }) + } + } + + impl From for self::ReceiptAggregateVoucher { + fn from(voucher: tap_graph::v2::ReceiptAggregateVoucher) -> Self { + Self { + allocation_id: voucher.allocationId.to_vec(), + timestamp_ns: voucher.timestampNs, + value_aggregate: Some(voucher.valueAggregate.into()), + payer: voucher.payer.to_vec(), + data_service: voucher.dataService.to_vec(), + service_provider: voucher.serviceProvider.to_vec(), + metadata: voucher.metadata.to_vec(), + } + } + } + + impl self::RavRequest { + pub fn new( + receipts: Vec, + previous_rav: Option, + ) -> Self { + Self { + receipts: receipts.into_iter().map(Into::into).collect(), + previous_rav: previous_rav.map(Into::into), + } + } + } + + impl self::RavResponse { + pub fn signed_rav(mut self) -> anyhow::Result { + let signed_rav: tap_graph::v2::SignedRav = self + .rav + .take() + .ok_or(anyhow!("Couldn't find rav"))? + .try_into()?; + Ok(signed_rav) + } + } } diff --git a/tap_aggregator/src/server.rs b/tap_aggregator/src/server.rs index 4d0b771c..f4520e01 100644 --- a/tap_aggregator/src/server.rs +++ b/tap_aggregator/src/server.rs @@ -21,16 +21,13 @@ use tonic::{codec::CompressionEncoding, service::Routes, Request, Response, Stat use tower::{layer::util::Identity, make::Shared}; use crate::{ - aggregator::check_and_aggregate_receipts, + aggregator, api_versioning::{ tap_rpc_api_versions_info, TapRpcApiVersion, TapRpcApiVersionsInfo, TAP_RPC_API_VERSIONS_DEPRECATED, }, error_codes::{JsonRpcErrorCode, JsonRpcWarningCode}, - grpc::v1::{ - tap_aggregator_server::{TapAggregator, TapAggregatorServer}, - RavRequest, RavResponse, - }, + grpc::{v1, v2}, jsonrpsee_helpers::{JsonRpcError, JsonRpcResponse, JsonRpcResult, JsonRpcWarning}, }; @@ -154,7 +151,7 @@ fn aggregate_receipts_( } let res = match api_version { - TapRpcApiVersion::V0_0 => check_and_aggregate_receipts( + TapRpcApiVersion::V0_0 => aggregator::v1::check_and_aggregate_receipts( domain_separator, &receipts, previous_rav, @@ -175,11 +172,11 @@ fn aggregate_receipts_( } #[tonic::async_trait] -impl TapAggregator for RpcImpl { +impl v1::tap_aggregator_server::TapAggregator for RpcImpl { async fn aggregate_receipts( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { let rav_request = request.into_inner(); let receipts: Vec = rav_request .receipts @@ -197,7 +194,55 @@ impl TapAggregator for RpcImpl { let receipts_grt: u128 = receipts.iter().map(|r| r.message.value).sum(); let receipts_count: u64 = receipts.len() as u64; - match check_and_aggregate_receipts( + match aggregator::v1::check_and_aggregate_receipts( + &self.domain_separator, + receipts.as_slice(), + previous_rav, + &self.wallet, + &self.accepted_addresses, + ) { + Ok(res) => { + TOTAL_GRT_AGGREGATED.inc_by(receipts_grt as f64); + TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count); + AGGREGATION_SUCCESS_COUNTER.inc(); + + let response = v1::RavResponse { + rav: Some(res.into()), + }; + Ok(Response::new(response)) + } + Err(e) => { + AGGREGATION_FAILURE_COUNTER.inc(); + Err(Status::failed_precondition(e.to_string())) + } + } + } +} + +#[tonic::async_trait] +impl v2::tap_aggregator_server::TapAggregator for RpcImpl { + async fn aggregate_receipts( + &self, + request: Request, + ) -> Result, Status> { + let rav_request = request.into_inner(); + let receipts: Vec = rav_request + .receipts + .into_iter() + .map(TryFrom::try_from) + .collect::>() + .map_err(|_| Status::invalid_argument("Error while getting list of signed_receipts"))?; + + let previous_rav = rav_request + .previous_rav + .map(TryFrom::try_from) + .transpose() + .map_err(|_| Status::invalid_argument("Error while getting previous rav"))?; + + let receipts_grt: u128 = receipts.iter().map(|r| r.message.value).sum(); + let receipts_count: u64 = receipts.len() as u64; + + match aggregator::v2::check_and_aggregate_receipts( &self.domain_separator, receipts.as_slice(), previous_rav, @@ -209,7 +254,7 @@ impl TapAggregator for RpcImpl { TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count); AGGREGATION_SUCCESS_COUNTER.inc(); - let response = RavResponse { + let response = v2::RavResponse { rav: Some(res.into()), }; Ok(Response::new(response)) @@ -357,7 +402,12 @@ async fn shutdown_handler() { fn create_grpc_service(rpc_impl: RpcImpl) -> Result { let grpc_service = Routes::new( - TapAggregatorServer::new(rpc_impl).accept_compressed(CompressionEncoding::Zstd), + v1::tap_aggregator_server::TapAggregatorServer::new(rpc_impl.clone()) + .accept_compressed(CompressionEncoding::Zstd), + ) + .add_service( + v2::tap_aggregator_server::TapAggregatorServer::new(rpc_impl) + .accept_compressed(CompressionEncoding::Zstd), ) .prepare(); From 726b45d4b33a003470dd9c0699576a52ed65ace5 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 29 Jan 2025 21:44:43 +0100 Subject: [PATCH 4/4] test: add test to verify if both are working Signed-off-by: Gustavo Inacio --- tap_aggregator/tests/aggregate_v1_and_v2.rs | 99 +++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 tap_aggregator/tests/aggregate_v1_and_v2.rs diff --git a/tap_aggregator/tests/aggregate_v1_and_v2.rs b/tap_aggregator/tests/aggregate_v1_and_v2.rs new file mode 100644 index 00000000..ade1a4f9 --- /dev/null +++ b/tap_aggregator/tests/aggregate_v1_and_v2.rs @@ -0,0 +1,99 @@ +// Copyright 2023-, Semiotic AI, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashSet; + +use alloy::{ + primitives::{address, Address}, + signers::local::PrivateKeySigner, +}; +use tap_aggregator::{ + grpc::{ + v1::{tap_aggregator_client::TapAggregatorClient as ClientV1, RavRequest as ReqV1}, + v2::{tap_aggregator_client::TapAggregatorClient as ClientV2, RavRequest as ReqV2}, + }, + server, +}; +use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; +use tap_graph::{v2::Receipt as ReceiptV2, Receipt as ReceiptV1}; +use tonic::codec::CompressionEncoding; + +#[tokio::test] +async fn aggregation_test() { + let domain_separator = tap_eip712_domain(1, Address::ZERO); + + let wallet = PrivateKeySigner::random(); + + let max_request_body_size = 1024 * 100; + let max_response_body_size = 1024 * 100; + let max_concurrent_connections = 1; + + let accepted_addresses = HashSet::from([wallet.address()]); + + let (_, local_addr) = server::run_server( + 0, + wallet.clone(), + accepted_addresses, + domain_separator.clone(), + max_request_body_size, + max_response_body_size, + max_concurrent_connections, + ) + .await + .unwrap(); + + let endpoint = format!("http://127.0.0.1:{}", local_addr.port()); + + let mut client = ClientV1::connect(endpoint.clone()) + .await + .unwrap() + .send_compressed(CompressionEncoding::Zstd); + + let allocation_id = address!("abababababababababababababababababababab"); + + // Create receipts + let mut receipts = Vec::new(); + for value in 50..60 { + receipts.push( + Eip712SignedMessage::new( + &domain_separator, + ReceiptV1::new(allocation_id, value).unwrap(), + &wallet, + ) + .unwrap(), + ); + } + + let rav_request = ReqV1::new(receipts.clone(), None); + let res = client.aggregate_receipts(rav_request).await; + + assert!(res.is_ok()); + + let mut client = ClientV2::connect(endpoint.clone()) + .await + .unwrap() + .send_compressed(CompressionEncoding::Zstd); + + let payer = address!("abababababababababababababababababababab"); + let data_service = address!("abababababababababababababababababababab"); + let service_provider = address!("abababababababababababababababababababab"); + + // Create receipts + let mut receipts = Vec::new(); + for value in 50..60 { + receipts.push( + Eip712SignedMessage::new( + &domain_separator, + ReceiptV2::new(allocation_id, payer, data_service, service_provider, value) + .unwrap(), + &wallet, + ) + .unwrap(), + ); + } + + let rav_request = ReqV2::new(receipts.clone(), None); + let res = client.aggregate_receipts(rav_request).await; + + assert!(res.is_ok()); +}