diff --git a/tap_aggregator/proto/v2.proto b/tap_aggregator/proto/v2.proto index 86d11147..89722ee1 100644 --- a/tap_aggregator/proto/v2.proto +++ b/tap_aggregator/proto/v2.proto @@ -7,7 +7,7 @@ package tap_aggregator.v2; import "uint128.proto"; message Receipt { - bytes allocation_id = 1; + bytes collection_id = 1; bytes payer = 2; bytes data_service = 3; bytes service_provider = 4; @@ -22,7 +22,7 @@ message SignedReceipt { } message ReceiptAggregateVoucher { - bytes allocation_id = 1; + bytes collection_id = 1; bytes payer = 2; bytes data_service = 3; bytes service_provider = 4; diff --git a/tap_aggregator/src/aggregator/v2.rs b/tap_aggregator/src/aggregator/v2.rs index 484ad4e8..c5f15ace 100644 --- a/tap_aggregator/src/aggregator/v2.rs +++ b/tap_aggregator/src/aggregator/v2.rs @@ -8,7 +8,9 @@ use rayon::prelude::*; use tap_core::{receipt::WithUniqueId, signed_message::Eip712SignedMessage}; use tap_graph::v2::{Receipt, ReceiptAggregateVoucher}; use thegraph_core::alloy::{ - dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner, + dyn_abi::Eip712Domain, + primitives::{Address, FixedBytes}, + signers::local::PrivateKeySigner, sol_types::SolStruct, }; @@ -39,9 +41,9 @@ pub fn check_and_aggregate_receipts( check_receipt_timestamps(receipts, previous_rav.as_ref())?; // Get the allocation id from the first receipt, return error if there are no receipts - let (allocation_id, payer, data_service, service_provider) = match receipts.first() { + let (collection_id, payer, data_service, service_provider) = match receipts.first() { Some(receipt) => ( - receipt.message.allocation_id, + receipt.message.collection_id, receipt.message.payer, receipt.message.data_service, receipt.message.service_provider, @@ -49,10 +51,10 @@ pub fn check_and_aggregate_receipts( None => return Err(tap_core::Error::NoValidReceiptsForRavRequest.into()), }; - // Check that the receipts all have the same allocation id - check_allocation_id( + // Check that the receipts all have the same collection id + check_collection_id( receipts, - allocation_id, + collection_id, payer, data_service, service_provider, @@ -60,21 +62,21 @@ pub fn check_and_aggregate_receipts( // Check that the rav has the correct allocation id if let Some(previous_rav) = &previous_rav { - let prev_id = previous_rav.message.allocationId; + let prev_id = previous_rav.message.collectionId; let prev_payer = previous_rav.message.payer; let prev_data_service = previous_rav.message.dataService; let prev_service_provider = previous_rav.message.serviceProvider; - if prev_id != allocation_id { + if prev_id != collection_id { return Err(tap_core::Error::RavAllocationIdMismatch { prev_id: format!("{prev_id:#X}"), - new_id: format!("{allocation_id:#X}"), + new_id: format!("{collection_id:#X}"), } .into()); } if prev_payer != payer { return Err(tap_core::Error::RavAllocationIdMismatch { prev_id: format!("{prev_id:#X}"), - new_id: format!("{allocation_id:#X}"), + new_id: format!("{collection_id:#X}"), } .into()); } @@ -82,14 +84,14 @@ pub fn check_and_aggregate_receipts( if prev_data_service != data_service { return Err(tap_core::Error::RavAllocationIdMismatch { prev_id: format!("{prev_id:#X}"), - new_id: format!("{allocation_id:#X}"), + new_id: format!("{collection_id:#X}"), } .into()); } if prev_service_provider != service_provider { return Err(tap_core::Error::RavAllocationIdMismatch { prev_id: format!("{prev_id:#X}"), - new_id: format!("{allocation_id:#X}"), + new_id: format!("{collection_id:#X}"), } .into()); } @@ -97,7 +99,7 @@ pub fn check_and_aggregate_receipts( // Aggregate the receipts let rav = ReceiptAggregateVoucher::aggregate_receipts( - allocation_id, + collection_id, payer, data_service, service_provider, @@ -123,16 +125,16 @@ fn check_signature_is_from_one_of_addresses( Ok(()) } -fn check_allocation_id( +fn check_collection_id( receipts: &[Eip712SignedMessage], - allocation_id: Address, + collection_id: FixedBytes<32>, payer: Address, data_service: Address, service_provider: Address, ) -> Result<()> { for receipt in receipts.iter() { let receipt = &receipt.message; - if receipt.allocation_id != allocation_id { + if receipt.collection_id != collection_id { return Err(tap_core::Error::RavAllocationIdNotUniform.into()); } if receipt.payer != payer { @@ -190,7 +192,7 @@ mod tests { use tap_graph::v2::{Receipt, ReceiptAggregateVoucher}; use thegraph_core::alloy::{ dyn_abi::Eip712Domain, - primitives::{address, Address, Bytes}, + primitives::{address, fixed_bytes, Address, Bytes, FixedBytes}, signers::local::PrivateKeySigner, }; @@ -202,8 +204,8 @@ mod tests { } #[fixture] - fn allocation_id() -> Address { - address!("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") + fn collection_id() -> FixedBytes<32> { + fixed_bytes!("deaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddead") } #[fixture] @@ -222,8 +224,8 @@ mod tests { } #[fixture] - fn other_address() -> Address { - address!("1234567890abcdef1234567890abcdef12345678") + fn other_collection_id() -> FixedBytes<32> { + fixed_bytes!("1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") } #[fixture] fn domain_separator() -> Eip712Domain { @@ -234,7 +236,7 @@ mod tests { #[test] fn check_signatures_unique_fail( keys: (PrivateKeySigner, Address), - allocation_id: Address, + collection_id: FixedBytes<32>, payer: Address, data_service: Address, service_provider: Address, @@ -244,7 +246,7 @@ mod tests { let mut receipts = Vec::new(); let receipt = Eip712SignedMessage::new( &domain_separator, - Receipt::new(allocation_id, payer, data_service, service_provider, 42).unwrap(), + Receipt::new(collection_id, payer, data_service, service_provider, 42).unwrap(), &keys.0, ) .unwrap(); @@ -259,7 +261,7 @@ mod tests { #[test] fn check_signatures_unique_ok( keys: (PrivateKeySigner, Address), - allocation_id: Address, + collection_id: FixedBytes<32>, payer: Address, data_service: Address, service_provider: Address, @@ -269,13 +271,13 @@ mod tests { let receipts = vec![ Eip712SignedMessage::new( &domain_separator, - Receipt::new(allocation_id, payer, data_service, service_provider, 42).unwrap(), + Receipt::new(collection_id, payer, data_service, service_provider, 42).unwrap(), &keys.0, ) .unwrap(), Eip712SignedMessage::new( &domain_separator, - Receipt::new(allocation_id, payer, data_service, service_provider, 42).unwrap(), + Receipt::new(collection_id, payer, data_service, service_provider, 42).unwrap(), &keys.0, ) .unwrap(), @@ -290,7 +292,7 @@ mod tests { /// Test that a receipt with a timestamp greater than the rav timestamp passes fn check_receipt_timestamps( keys: (PrivateKeySigner, Address), - allocation_id: Address, + collection_id: FixedBytes<32>, payer: Address, data_service: Address, service_provider: Address, @@ -304,7 +306,7 @@ mod tests { Eip712SignedMessage::new( &domain_separator, Receipt { - allocation_id, + collection_id, payer, data_service, service_provider, @@ -322,7 +324,7 @@ mod tests { let rav = Eip712SignedMessage::new( &domain_separator, ReceiptAggregateVoucher { - allocationId: allocation_id, + collectionId: collection_id, dataService: data_service, payer, serviceProvider: service_provider, @@ -340,7 +342,7 @@ mod tests { let rav = Eip712SignedMessage::new( &domain_separator, ReceiptAggregateVoucher { - allocationId: allocation_id, + collectionId: collection_id, dataService: data_service, payer, serviceProvider: service_provider, @@ -358,7 +360,7 @@ mod tests { let rav = Eip712SignedMessage::new( &domain_separator, ReceiptAggregateVoucher { - allocationId: allocation_id, + collectionId: collection_id, dataService: data_service, payer, serviceProvider: service_provider, @@ -378,37 +380,44 @@ mod tests { /// and 1 receipt that has the wrong allocation id fn check_allocation_id_fail( keys: (PrivateKeySigner, Address), - allocation_id: Address, + collection_id: FixedBytes<32>, payer: Address, data_service: Address, service_provider: Address, - other_address: Address, + other_collection_id: FixedBytes<32>, domain_separator: Eip712Domain, ) { let receipts = vec![ Eip712SignedMessage::new( &domain_separator, - Receipt::new(allocation_id, payer, data_service, service_provider, 42).unwrap(), + Receipt::new(collection_id, payer, data_service, service_provider, 42).unwrap(), &keys.0, ) .unwrap(), Eip712SignedMessage::new( &domain_separator, - Receipt::new(allocation_id, payer, data_service, service_provider, 43).unwrap(), + Receipt::new(collection_id, payer, data_service, service_provider, 43).unwrap(), &keys.0, ) .unwrap(), Eip712SignedMessage::new( &domain_separator, - Receipt::new(other_address, payer, data_service, service_provider, 44).unwrap(), + Receipt::new( + other_collection_id, + payer, + data_service, + service_provider, + 44, + ) + .unwrap(), &keys.0, ) .unwrap(), ]; - let res = super::check_allocation_id( + let res = super::check_collection_id( &receipts, - allocation_id, + collection_id, payer, data_service, service_provider, @@ -422,7 +431,7 @@ mod tests { /// Test check_allocation_id with 3 receipts that have the correct allocation id fn check_allocation_id_ok( keys: (PrivateKeySigner, Address), - allocation_id: Address, + collection_id: FixedBytes<32>, payer: Address, data_service: Address, service_provider: Address, @@ -431,27 +440,27 @@ mod tests { let receipts = vec![ Eip712SignedMessage::new( &domain_separator, - Receipt::new(allocation_id, payer, data_service, service_provider, 42).unwrap(), + Receipt::new(collection_id, payer, data_service, service_provider, 42).unwrap(), &keys.0, ) .unwrap(), Eip712SignedMessage::new( &domain_separator, - Receipt::new(allocation_id, payer, data_service, service_provider, 43).unwrap(), + Receipt::new(collection_id, payer, data_service, service_provider, 43).unwrap(), &keys.0, ) .unwrap(), Eip712SignedMessage::new( &domain_separator, - Receipt::new(allocation_id, payer, data_service, service_provider, 44).unwrap(), + Receipt::new(collection_id, payer, data_service, service_provider, 44).unwrap(), &keys.0, ) .unwrap(), ]; - let res = super::check_allocation_id( + let res = super::check_collection_id( &receipts, - allocation_id, + collection_id, payer, data_service, service_provider, diff --git a/tap_aggregator/src/grpc.rs b/tap_aggregator/src/grpc.rs index be1261c5..2efa0618 100644 --- a/tap_aggregator/src/grpc.rs +++ b/tap_aggregator/src/grpc.rs @@ -151,7 +151,7 @@ pub mod v2 { type Error = anyhow::Error; fn try_from(receipt: self::Receipt) -> Result { Ok(Self { - allocation_id: receipt.allocation_id.as_slice().try_into()?, + collection_id: receipt.collection_id.as_slice().try_into()?, timestamp_ns: receipt.timestamp_ns, value: receipt.value.ok_or(anyhow!("Missing value"))?.into(), nonce: receipt.nonce, @@ -178,7 +178,7 @@ pub mod v2 { impl From for self::Receipt { fn from(value: tap_graph::v2::Receipt) -> Self { Self { - allocation_id: value.allocation_id.as_slice().to_vec(), + collection_id: value.collection_id.as_slice().to_vec(), timestamp_ns: value.timestamp_ns, nonce: value.nonce, value: Some(value.value.into()), @@ -224,7 +224,7 @@ pub mod v2 { type Error = anyhow::Error; fn try_from(voucher: self::ReceiptAggregateVoucher) -> Result { Ok(Self { - allocationId: voucher.allocation_id.as_slice().try_into()?, + collectionId: voucher.collection_id.as_slice().try_into()?, timestampNs: voucher.timestamp_ns, valueAggregate: voucher .value_aggregate @@ -241,7 +241,7 @@ pub mod v2 { impl From for self::ReceiptAggregateVoucher { fn from(voucher: tap_graph::v2::ReceiptAggregateVoucher) -> Self { Self { - allocation_id: voucher.allocationId.to_vec(), + collection_id: voucher.collectionId.to_vec(), timestamp_ns: voucher.timestampNs, value_aggregate: Some(voucher.valueAggregate.into()), payer: voucher.payer.to_vec(), diff --git a/tap_aggregator/src/server.rs b/tap_aggregator/src/server.rs index 8499b2d8..63aa6dbd 100644 --- a/tap_aggregator/src/server.rs +++ b/tap_aggregator/src/server.rs @@ -1,7 +1,7 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashSet, str::FromStr}; +use std::{collections::HashSet, fmt::Debug, str::FromStr}; use anyhow::Result; use axum::{error_handling::HandleError, routing::post_service, BoxError, Router}; @@ -273,7 +273,7 @@ impl v2::tap_aggregator_server::TapAggregator for RpcImpl { produce_kafka_records( kafka, &res.message.payer, - &res.message.allocationId, + &res.message.collectionId, res.message.valueAggregate, ); } @@ -479,14 +479,14 @@ fn create_json_rpc_service( Ok((handle, server_handle)) } -fn produce_kafka_records( +fn produce_kafka_records( kafka: &rdkafka::producer::ThreadedProducer, sender: &Address, - allocation: &Address, + key_fragment: &K, aggregated_value: u128, ) { let topic = "gateway_ravs"; - let key = format!("{sender:?}:{allocation:?}"); + let key = format!("{sender:?}:{key_fragment:?}"); let payload = aggregated_value.to_string(); let result = kafka.send( rdkafka::producer::BaseRecord::to(topic) diff --git a/tap_aggregator/tests/aggregate_v1_and_v2.rs b/tap_aggregator/tests/aggregate_v1_and_v2.rs index d4cea84e..3ea4d9b2 100644 --- a/tap_aggregator/tests/aggregate_v1_and_v2.rs +++ b/tap_aggregator/tests/aggregate_v1_and_v2.rs @@ -13,7 +13,7 @@ use tap_aggregator::{ use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; use tap_graph::{v2::Receipt as ReceiptV2, Receipt as ReceiptV1}; use thegraph_core::alloy::{ - primitives::{address, Address}, + primitives::{address, Address, FixedBytes}, signers::local::PrivateKeySigner, }; use tonic::codec::CompressionEncoding; @@ -51,6 +51,10 @@ async fn aggregation_test() { .send_compressed(CompressionEncoding::Zstd); let allocation_id = address!("abababababababababababababababababababab"); + let mut padded = [0u8; 32]; + padded[12..].copy_from_slice(allocation_id.as_slice()); + + let collection_id = FixedBytes::<32>::from(padded); // Create receipts let mut receipts = Vec::new(); @@ -85,7 +89,7 @@ async fn aggregation_test() { receipts.push( Eip712SignedMessage::new( &domain_separator, - ReceiptV2::new(allocation_id, payer, data_service, service_provider, value) + ReceiptV2::new(collection_id, payer, data_service, service_provider, value) .unwrap(), &wallet, ) diff --git a/tap_graph/src/v2/rav.rs b/tap_graph/src/v2/rav.rs index b56a7f8c..ca58b20b 100644 --- a/tap_graph/src/v2/rav.rs +++ b/tap_graph/src/v2/rav.rs @@ -13,7 +13,7 @@ use tap_receipt::{ ReceiptWithState, WithValueAndTimestamp, }; use thegraph_core::alloy::{ - primitives::{Address, Bytes}, + primitives::{Address, Bytes, FixedBytes}, sol, }; @@ -28,8 +28,8 @@ sol! { /// We use camelCase for field names to match the Ethereum ABI encoding #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] struct ReceiptAggregateVoucher { - /// Unique allocation id this RAV belongs to - address allocationId; + /// Unique collection id this RAV belongs to + bytes32 collectionId; // The address of the payer the RAV was issued by address payer; // The address of the data service the RAV was issued to @@ -55,7 +55,7 @@ impl ReceiptAggregateVoucher { /// Returns [`Error::AggregateOverflow`] if any receipt value causes aggregate /// value to overflow pub fn aggregate_receipts( - allocation_id: Address, + collection_id: FixedBytes<32>, payer: Address, data_service: Address, service_provider: Address, @@ -82,7 +82,7 @@ impl ReceiptAggregateVoucher { } Ok(Self { - allocationId: allocation_id, + collectionId: collection_id, timestampNs: timestamp_max, valueAggregate: value_aggregate, payer, @@ -101,7 +101,7 @@ impl Aggregate for ReceiptAggregateVoucher { if receipts.is_empty() { return Err(AggregationError::NoValidReceiptsForRavRequest); } - let allocation_id = receipts[0].signed_receipt().message.allocation_id; + let collection_id = receipts[0].signed_receipt().message.collection_id; let payer = receipts[0].signed_receipt().message.payer; let data_service = receipts[0].signed_receipt().message.data_service; let service_provider = receipts[0].signed_receipt().message.service_provider; @@ -110,7 +110,7 @@ impl Aggregate for ReceiptAggregateVoucher { .map(|rx_receipt| rx_receipt.signed_receipt().clone()) .collect::>(); ReceiptAggregateVoucher::aggregate_receipts( - allocation_id, + collection_id, payer, data_service, service_provider, diff --git a/tap_graph/src/v2/receipt.rs b/tap_graph/src/v2/receipt.rs index d38248ef..34d66053 100644 --- a/tap_graph/src/v2/receipt.rs +++ b/tap_graph/src/v2/receipt.rs @@ -9,7 +9,10 @@ use rand::{rng, Rng}; use serde::{Deserialize, Serialize}; use tap_eip712_message::Eip712SignedMessage; use tap_receipt::WithValueAndTimestamp; -use thegraph_core::alloy::{primitives::Address, sol}; +use thegraph_core::alloy::{ + primitives::{Address, FixedBytes}, + sol, +}; /// A signed receipt message pub type SignedReceipt = Eip712SignedMessage; @@ -18,8 +21,8 @@ sol! { /// Holds information needed for promise of payment signed with ECDSA #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] struct Receipt { - /// Unique allocation id this receipt belongs to - address allocation_id; + /// Unique collection id this receipt belongs to + bytes32 collection_id; // The address of the payer the RAV was issued by address payer; @@ -43,7 +46,7 @@ fn get_current_timestamp_u64_ns() -> Result { impl Receipt { /// Returns a receipt with provided values pub fn new( - allocation_id: Address, + collection_id: FixedBytes<32>, payer: Address, data_service: Address, service_provider: Address, @@ -52,7 +55,7 @@ impl Receipt { let timestamp_ns = get_current_timestamp_u64_ns()?; let nonce = rng().random::(); Ok(Self { - allocation_id, + collection_id, payer, data_service, service_provider, @@ -78,13 +81,13 @@ mod receipt_unit_test { use std::time::{SystemTime, UNIX_EPOCH}; use rstest::*; - use thegraph_core::alloy::primitives::address; + use thegraph_core::alloy::primitives::{address, fixed_bytes}; use super::*; #[fixture] - fn allocation_id() -> Address { - address!("1234567890abcdef1234567890abcdef12345678") + fn collection_id() -> FixedBytes<32> { + fixed_bytes!("deaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddead") } #[fixture] @@ -109,18 +112,18 @@ mod receipt_unit_test { #[fixture] fn receipt( - allocation_id: Address, + collection_id: FixedBytes<32>, payer: Address, data_service: Address, service_provider: Address, value: u128, ) -> Receipt { - Receipt::new(allocation_id, payer, data_service, service_provider, value).unwrap() + Receipt::new(collection_id, payer, data_service, service_provider, value).unwrap() } #[rstest] - fn test_new_receipt(allocation_id: Address, value: u128, receipt: Receipt) { - assert_eq!(receipt.allocation_id, allocation_id); + fn test_new_receipt(collection_id: FixedBytes<32>, value: u128, receipt: Receipt) { + assert_eq!(receipt.collection_id, collection_id); assert_eq!(receipt.value, value); // Check that the timestamp is within a reasonable range