diff --git a/tap_aggregator/src/grpc.rs b/tap_aggregator/src/grpc.rs index bcc7e274..03c27ea9 100644 --- a/tap_aggregator/src/grpc.rs +++ b/tap_aggregator/src/grpc.rs @@ -114,7 +114,7 @@ impl From for Uint128 { impl RavRequest { pub fn new( receipts: Vec, - previous_rav: Option, + previous_rav: Option, ) -> Self { Self { receipts: receipts.into_iter().map(Into::into).collect(), @@ -124,8 +124,8 @@ impl RavRequest { } impl RavResponse { - pub fn signed_rav(mut self) -> anyhow::Result { - let signed_rav: tap_core::rav::SignedRAV = self + pub fn signed_rav(mut self) -> anyhow::Result { + let signed_rav: tap_core::rav::SignedRav = self .rav .take() .ok_or(anyhow!("Couldn't find rav"))? diff --git a/tap_aggregator/tests/aggregate_test.rs b/tap_aggregator/tests/aggregate_test.rs index 8c16436c..86bb23f6 100644 --- a/tap_aggregator/tests/aggregate_test.rs +++ b/tap_aggregator/tests/aggregate_test.rs @@ -64,11 +64,11 @@ async fn aggregation_test() { let rav_request = RavRequest::new(receipts.clone(), None); let res = client.aggregate_receipts(rav_request).await.unwrap(); - let signed_rav: tap_core::rav::SignedRAV = res.into_inner().signed_rav().unwrap(); + let signed_rav: tap_core::rav::SignedRav = res.into_inner().signed_rav().unwrap(); let sender_aggregator = HttpClientBuilder::default().build(&endpoint).unwrap(); - let previous_rav: Option = None; + let previous_rav: Option = None; let response: JsonRpcResponse> = sender_aggregator .request( diff --git a/tap_core/src/error.rs b/tap_core/src/error.rs index 592a6921..17d5ac82 100644 --- a/tap_core/src/error.rs +++ b/tap_core/src/error.rs @@ -9,7 +9,7 @@ use std::result::Result as StdResult; use alloy::primitives::{Address, SignatureError}; use thiserror::Error as ThisError; -use crate::{rav::ReceiptAggregateVoucher, receipt::ReceiptError}; +use crate::receipt::ReceiptError; /// Error type for the TAP protocol #[derive(ThisError, Debug)] @@ -36,10 +36,10 @@ pub enum Error { }, /// Error when the received RAV does not match the expected RAV - #[error("Received RAV does not match expexted RAV")] - InvalidReceivedRAV { - received_rav: ReceiptAggregateVoucher, - expected_rav: ReceiptAggregateVoucher, + #[error("Received RAV does not match expected RAV")] + InvalidReceivedRav { + received_rav: String, + expected_rav: String, }, /// Generic error from the adapter #[error("Error from adapter.\n Caused by: {source_error}")] diff --git a/tap_core/src/manager/adapters/rav.rs b/tap_core/src/manager/adapters/rav.rs index 3e719b98..8c40a625 100644 --- a/tap_core/src/manager/adapters/rav.rs +++ b/tap_core/src/manager/adapters/rav.rs @@ -1,9 +1,10 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 +use alloy::sol_types::SolStruct; use async_trait::async_trait; -use crate::rav::SignedRAV; +use crate::signed_message::EIP712SignedMessage; /// Stores the latest RAV in the storage. /// @@ -12,7 +13,7 @@ use crate::rav::SignedRAV; /// For example code see [crate::manager::context::memory::RAVStorage] #[async_trait] -pub trait RAVStore { +pub trait RavStore { /// Defines the user-specified error type. /// /// This error type should implement the `Error` and `Debug` traits from @@ -25,7 +26,7 @@ pub trait RAVStore { /// This method should be implemented to store the most recent validated /// `SignedRAV` into your chosen storage system. Any errors that occur /// during this process should be captured and returned as an `AdapterError`. - async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError>; + async fn update_last_rav(&self, rav: EIP712SignedMessage) -> Result<(), Self::AdapterError>; } /// Reads the RAV from storage @@ -35,7 +36,7 @@ pub trait RAVStore { /// For example code see [crate::manager::context::memory::RAVStorage] #[async_trait] -pub trait RAVRead { +pub trait RavRead { /// Defines the user-specified error type. /// /// This error type should implement the `Error` and `Debug` traits from @@ -46,5 +47,5 @@ pub trait RAVRead { /// Retrieves the latest `SignedRAV` from the storage. /// /// If no `SignedRAV` is available, this method should return `None`. - async fn last_rav(&self) -> Result, Self::AdapterError>; + async fn last_rav(&self) -> Result>, Self::AdapterError>; } diff --git a/tap_core/src/manager/context/memory.rs b/tap_core/src/manager/context/memory.rs index 7594ac41..24673b41 100644 --- a/tap_core/src/manager/context/memory.rs +++ b/tap_core/src/manager/context/memory.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use crate::{ manager::adapters::*, - rav::SignedRAV, + rav::{ReceiptAggregateVoucher, SignedRav}, receipt::{checks::StatefulTimestampCheck, state::Checking, ReceiptWithState, SignedReceipt}, signed_message::MessageId, }; @@ -25,7 +25,7 @@ use crate::{ pub type EscrowStorage = Arc>>; pub type QueryAppraisals = Arc>>; pub type ReceiptStorage = Arc>>>; -pub type RAVStorage = Arc>>; +pub type RAVStorage = Arc>>; use thiserror::Error; @@ -125,10 +125,10 @@ impl InMemoryContext { } #[async_trait] -impl RAVStore for InMemoryContext { +impl RavStore for InMemoryContext { type AdapterError = InMemoryError; - async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError> { + async fn update_last_rav(&self, rav: SignedRav) -> Result<(), Self::AdapterError> { let mut rav_storage = self.rav_storage.write().unwrap(); let timestamp = rav.message.timestampNs; *rav_storage = Some(rav); @@ -138,10 +138,10 @@ impl RAVStore for InMemoryContext { } #[async_trait] -impl RAVRead for InMemoryContext { +impl RavRead for InMemoryContext { type AdapterError = InMemoryError; - async fn last_rav(&self) -> Result, Self::AdapterError> { + async fn last_rav(&self) -> Result, Self::AdapterError> { Ok(self.rav_storage.read().unwrap().clone()) } } diff --git a/tap_core/src/manager/mod.rs b/tap_core/src/manager/mod.rs index fb541f51..7d34df82 100644 --- a/tap_core/src/manager/mod.rs +++ b/tap_core/src/manager/mod.rs @@ -42,6 +42,7 @@ //! ReceiptError, //! Context //! }, +//! rav::ReceiptAggregateVoucher, //! manager::{ //! Manager, //! adapters::ReceiptStore @@ -70,7 +71,7 @@ //! //! let receipt = EIP712SignedMessage::new(&domain_separator, message, &wallet).unwrap(); //! -//! let manager = Manager::new(domain_separator, MyContext, CheckList::empty()); +//! let manager = Manager::<_, _, ReceiptAggregateVoucher>::new(domain_separator, MyContext, CheckList::empty()); //! manager.verify_and_store_receipt(&Context::new(), receipt).await.unwrap() //! # } //! ``` diff --git a/tap_core/src/manager/tap_manager.rs b/tap_core/src/manager/tap_manager.rs index f956455e..39f0dae1 100644 --- a/tap_core/src/manager/tap_manager.rs +++ b/tap_core/src/manager/tap_manager.rs @@ -1,23 +1,25 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use alloy::dyn_abi::Eip712Domain; +use std::marker::PhantomData; + +use alloy::{dyn_abi::Eip712Domain, sol_types::SolStruct}; use super::adapters::{ - RAVRead, RAVStore, ReceiptDelete, ReceiptRead, ReceiptStore, SignatureChecker, + RavRead, RavStore, ReceiptDelete, ReceiptRead, ReceiptStore, SignatureChecker, }; use crate::{ - rav::{RavRequest, ReceiptAggregateVoucher, SignedRAV}, + rav::{Aggregate, RavRequest}, receipt::{ checks::{CheckBatch, CheckList, TimestampCheck, UniqueCheck}, state::{Checked, Failed}, - Context, ReceiptError, ReceiptWithState, SignedReceipt, WithUniqueId, - WithValueAndTimestamp, + Context, ReceiptError, ReceiptWithState, WithUniqueId, WithValueAndTimestamp, }, + signed_message::EIP712SignedMessage, Error, }; -pub struct Manager { +pub struct Manager { /// Context that implements adapters context: E, @@ -27,9 +29,11 @@ pub struct Manager { /// Struct responsible for doing checks for receipt. Ownership stays with manager allowing manager /// to update configuration ( like minimum timestamp ). domain_separator: Eip712Domain, + + _phantom: PhantomData, } -impl Manager { +impl Manager { /// Creates new manager with provided `adapters`, any receipts received by this manager /// will complete all `required_checks` before being accepted or declined from RAV. /// `starting_min_timestamp` will be used as min timestamp until the first RAV request is created. @@ -43,13 +47,15 @@ impl Manager { context, domain_separator, checks: checks.into(), + _phantom: PhantomData, } } } -impl Manager +impl Manager where - E: RAVStore + SignatureChecker, + E: RavStore + SignatureChecker, + Rav: SolStruct + PartialEq + Sync + std::fmt::Debug, { /// Verify `signed_rav` matches all values on `expected_rav`, and that `signed_rav` has a valid signer. /// @@ -59,17 +65,17 @@ where /// pub async fn verify_and_store_rav( &self, - expected_rav: ReceiptAggregateVoucher, - signed_rav: SignedRAV, + expected_rav: Rav, + signed_rav: EIP712SignedMessage, ) -> std::result::Result<(), Error> { self.context .check_signature(&signed_rav, &self.domain_separator) .await?; if signed_rav.message != expected_rav { - return Err(Error::InvalidReceivedRAV { - received_rav: signed_rav.message, - expected_rav, + return Err(Error::InvalidReceivedRav { + received_rav: format!("{:?}", signed_rav.message), + expected_rav: format!("{:?}", expected_rav), }); } @@ -84,11 +90,12 @@ where } } -impl Manager +impl Manager where - E: RAVRead, + E: RavRead, + Rav: SolStruct, { - async fn get_previous_rav(&self) -> Result, Error> { + async fn get_previous_rav(&self) -> Result>, Error> { let previous_rav = self .context .last_rav() @@ -100,7 +107,7 @@ where } } -impl Manager +impl Manager where E: ReceiptRead + SignatureChecker, Rcpt: WithUniqueId + WithValueAndTimestamp + Sync, @@ -162,11 +169,11 @@ where } } -// TODO make create_rav_request generic over receipt -impl Manager +impl Manager where - E: ReceiptRead + RAVRead + SignatureChecker, - //Rcpt: WithUniqueId + WithValueAndTimestamp + Sync, + E: ReceiptRead + RavRead + SignatureChecker, + Rav: SolStruct + WithValueAndTimestamp + Clone + Aggregate, + Rcpt: WithUniqueId + WithValueAndTimestamp + Sync, { /// Completes remaining checks on all receipts up to /// (current time - `timestamp_buffer_ns`). Returns them in two lists @@ -188,18 +195,18 @@ where ctx: &Context, timestamp_buffer_ns: u64, receipts_limit: Option, - ) -> Result, Error> { + ) -> Result, Error> { let previous_rav = self.get_previous_rav().await?; let min_timestamp_ns = previous_rav .as_ref() - .map(|rav| rav.message.timestampNs + 1) + .map(|rav| rav.message.timestamp_ns() + 1) .unwrap_or(0); let (valid_receipts, invalid_receipts) = self .collect_receipts(ctx, timestamp_buffer_ns, min_timestamp_ns, receipts_limit) .await?; - let expected_rav = Self::generate_expected_rav(&valid_receipts, previous_rav.clone()); + let expected_rav = Rav::aggregate_receipts(&valid_receipts, previous_rav.clone()); Ok(RavRequest { valid_receipts, @@ -208,30 +215,12 @@ where expected_rav, }) } - - fn generate_expected_rav( - receipts: &[ReceiptWithState], - previous_rav: Option, - ) -> Result { - if receipts.is_empty() { - return Err(Error::NoValidReceiptsForRavRequest); - } - let allocation_id = receipts[0].signed_receipt().message.allocation_id; - let receipts = receipts - .iter() - .map(|rx_receipt| rx_receipt.signed_receipt().clone()) - .collect::>(); - ReceiptAggregateVoucher::aggregate_receipts( - allocation_id, - receipts.as_slice(), - previous_rav, - ) - } } -impl Manager +impl Manager where - E: ReceiptDelete + RAVRead, + E: ReceiptDelete + RavRead, + Rav: SolStruct + WithValueAndTimestamp, { /// Removes obsolete receipts from storage. Obsolete receipts are receipts /// that are older than the last RAV, and therefore already aggregated into the RAV. @@ -247,7 +236,7 @@ where match self.get_previous_rav().await? { Some(last_rav) => { self.context - .remove_receipts_in_timestamp_range(..=last_rav.message.timestampNs) + .remove_receipts_in_timestamp_range(..=last_rav.message.timestamp_ns()) .await .map_err(|err| Error::AdapterError { source_error: anyhow::Error::new(err), @@ -259,7 +248,7 @@ where } } -impl Manager +impl Manager where E: ReceiptStore, { diff --git a/tap_core/src/rav.rs b/tap_core/src/rav.rs index 2fbaeccb..03222393 100644 --- a/tap_core/src/rav.rs +++ b/tap_core/src/rav.rs @@ -41,15 +41,28 @@ mod request; use std::cmp; -use alloy::{primitives::Address, sol}; +use alloy::{primitives::Address, sol, sol_types::SolStruct}; use serde::{Deserialize, Serialize}; -use crate::{receipt::Receipt, signed_message::EIP712SignedMessage, Error}; +use crate::{ + receipt::{state::Checked, Receipt, ReceiptWithState, SignedReceipt, WithValueAndTimestamp}, + signed_message::EIP712SignedMessage, + Error, +}; /// EIP712 signed message for ReceiptAggregateVoucher -pub type SignedRAV = EIP712SignedMessage; +pub type SignedRav = EIP712SignedMessage; pub use request::RavRequest; +pub trait Aggregate: SolStruct { + /// Aggregates a batch of validated receipts with optional validated previous RAV, + /// returning a new RAV if all provided items are valid or an error if not. + fn aggregate_receipts( + receipts: &[ReceiptWithState], + previous_rav: Option>, + ) -> Result; +} + sol! { /// Holds information needed for promise of payment signed with ECDSA /// @@ -106,3 +119,34 @@ impl ReceiptAggregateVoucher { }) } } + +impl Aggregate for ReceiptAggregateVoucher { + fn aggregate_receipts( + receipts: &[ReceiptWithState], + previous_rav: Option>, + ) -> Result { + if receipts.is_empty() { + return Err(Error::NoValidReceiptsForRavRequest); + } + let allocation_id = receipts[0].signed_receipt().message.allocation_id; + let receipts = receipts + .iter() + .map(|rx_receipt| rx_receipt.signed_receipt().clone()) + .collect::>(); + ReceiptAggregateVoucher::aggregate_receipts( + allocation_id, + receipts.as_slice(), + previous_rav, + ) + } +} + +impl WithValueAndTimestamp for ReceiptAggregateVoucher { + fn value(&self) -> u128 { + self.valueAggregate + } + + fn timestamp_ns(&self) -> u64 { + self.timestampNs + } +} diff --git a/tap_core/src/rav/request.rs b/tap_core/src/rav/request.rs index 77ee35cf..8d23e75c 100644 --- a/tap_core/src/rav/request.rs +++ b/tap_core/src/rav/request.rs @@ -1,24 +1,26 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 +use alloy::sol_types::SolStruct; + use crate::{ - rav::{ReceiptAggregateVoucher, SignedRAV}, receipt::{ state::{Checked, Failed}, ReceiptWithState, }, + signed_message::EIP712SignedMessage, Error, }; /// Request to `tap_aggregator` to aggregate receipts into a Signed RAV. #[derive(Debug)] -pub struct RavRequest { +pub struct RavRequest { /// List of checked and reserved receipts to aggregate pub valid_receipts: Vec>, /// Optional previous RAV to aggregate with - pub previous_rav: Option, + pub previous_rav: Option>, /// List of failed receipt used to log invalid receipts pub invalid_receipts: Vec>, /// Expected RAV to be created - pub expected_rav: Result, + pub expected_rav: Result, } diff --git a/tap_core/tests/manager_test.rs b/tap_core/tests/manager_test.rs index d949acdf..7b035635 100644 --- a/tap_core/tests/manager_test.rs +++ b/tap_core/tests/manager_test.rs @@ -128,7 +128,8 @@ async fn manager_verify_and_store_varying_initial_checks( signer, .. } = context; - let manager = Manager::new(domain_separator.clone(), context, checks); + let manager = + Manager::<_, _, ReceiptAggregateVoucher>::new(domain_separator.clone(), context, checks); let value = 20u128; let signed_receipt = EIP712SignedMessage::new( diff --git a/tap_core/tests/rav_test.rs b/tap_core/tests/rav_test.rs index 1b96edca..ad2c58d9 100644 --- a/tap_core/tests/rav_test.rs +++ b/tap_core/tests/rav_test.rs @@ -16,7 +16,7 @@ use alloy::{ use rstest::*; use tap_core::{ manager::{ - adapters::{RAVRead, RAVStore}, + adapters::{RavRead, RavStore}, context::memory::InMemoryContext, }, rav::ReceiptAggregateVoucher, diff --git a/tap_integration_tests/tests/indexer_mock.rs b/tap_integration_tests/tests/indexer_mock.rs index 4aa9b430..57069858 100644 --- a/tap_integration_tests/tests/indexer_mock.rs +++ b/tap_integration_tests/tests/indexer_mock.rs @@ -18,10 +18,10 @@ use jsonrpsee_core::client::ClientT; use tap_aggregator::jsonrpsee_helpers; use tap_core::{ manager::{ - adapters::{RAVRead, RAVStore, ReceiptRead, ReceiptStore, SignatureChecker}, + adapters::{RavRead, RavStore, ReceiptRead, ReceiptStore, SignatureChecker}, Manager, }, - rav::SignedRAV, + rav::{ReceiptAggregateVoucher, SignedRav}, receipt::{checks::CheckList, Context, SignedReceipt}, }; /// Rpc trait represents a JSON-RPC server that has a single async method `request`. @@ -44,9 +44,9 @@ pub trait Rpc { /// threshold is a limit to which receipt_count can increment, after reaching which RAV request is triggered. /// aggregator_client is an HTTP client used for making JSON-RPC requests to another server. pub struct RpcManager { - manager: Arc>, // Manager object reference counted with an Arc - receipt_count: Arc, // Thread-safe atomic counter for receipts - threshold: u64, // The count at which a RAV request will be triggered + manager: Arc>, // Manager object reference counted with an Arc + receipt_count: Arc, // Thread-safe atomic counter for receipts + threshold: u64, // The count at which a RAV request will be triggered aggregator_client: (HttpClient, String), // HTTP client for sending requests to the aggregator server } @@ -66,7 +66,7 @@ where aggregate_server_api_version: String, ) -> Result { Ok(Self { - manager: Arc::new(Manager::::new( + manager: Arc::new(Manager::::new( domain_separator, context, required_checks, @@ -86,8 +86,8 @@ impl RpcServer for RpcManager where E: ReceiptStore + ReceiptRead - + RAVStore - + RAVRead + + RavStore + + RavRead + SignatureChecker + Send + Sync @@ -153,8 +153,8 @@ pub async fn run_server( where E: ReceiptStore + ReceiptRead - + RAVStore - + RAVRead + + RavStore + + RavRead + SignatureChecker + Clone + Send @@ -184,13 +184,16 @@ where // request_rav function creates a request for aggregate receipts (RAV), sends it to another server and verifies the result. async fn request_rav( - manager: &Arc>, + manager: &Arc>, time_stamp_buffer: u64, // Buffer for timestamping, see tap_core for details aggregator_client: &(HttpClient, String), // HttpClient for making requests to the tap_aggregator server threshold: usize, ) -> Result<()> where - E: ReceiptRead + RAVRead + RAVStore + SignatureChecker, + E: ReceiptRead + + RavRead + + RavStore + + SignatureChecker, { // Create the aggregate_receipts request params let rav_request = manager @@ -209,7 +212,7 @@ where ); // Call the aggregate_receipts method on the other server - let remote_rav_result: jsonrpsee_helpers::JsonRpcResponse = aggregator_client + let remote_rav_result: jsonrpsee_helpers::JsonRpcResponse = aggregator_client .0 .request("aggregate_receipts", params) .await?; diff --git a/tap_integration_tests/tests/showcase.rs b/tap_integration_tests/tests/showcase.rs index fde278d0..bc751c31 100644 --- a/tap_integration_tests/tests/showcase.rs +++ b/tap_integration_tests/tests/showcase.rs @@ -26,7 +26,7 @@ use rstest::*; use tap_aggregator::{jsonrpsee_helpers, server as agg_server}; use tap_core::{ manager::context::memory::{checks::get_full_list_of_checks, *}, - rav::SignedRAV, + rav::SignedRav, receipt::{ checks::{CheckList, StatefulTimestampCheck}, Receipt, SignedReceipt, @@ -722,7 +722,7 @@ async fn test_tap_aggregator_rav_timestamp_cuttoff( let second_batch = &requests[receipt_threshold_1 as usize..2 * receipt_threshold_1 as usize]; let params = rpc_params!(&aggregate_server_api_version(), &first_batch, None::<()>); - let first_rav_response: jsonrpsee_helpers::JsonRpcResponse = + let first_rav_response: jsonrpsee_helpers::JsonRpcResponse = client.request("aggregate_receipts", params).await?; let params = rpc_params!( @@ -731,7 +731,7 @@ async fn test_tap_aggregator_rav_timestamp_cuttoff( first_rav_response.data ); let second_rav_response: Result< - jsonrpsee_helpers::JsonRpcResponse, + jsonrpsee_helpers::JsonRpcResponse, jsonrpsee::core::ClientError, > = client.request("aggregate_receipts", params).await; assert!( @@ -747,7 +747,7 @@ async fn test_tap_aggregator_rav_timestamp_cuttoff( let second_batch = &requests[receipt_threshold_1 as usize..2 * receipt_threshold_1 as usize]; let params = rpc_params!(&aggregate_server_api_version(), &first_batch, None::<()>); - let first_rav_response: jsonrpsee_helpers::JsonRpcResponse = + let first_rav_response: jsonrpsee_helpers::JsonRpcResponse = client.request("aggregate_receipts", params).await?; let params = rpc_params!( @@ -755,7 +755,7 @@ async fn test_tap_aggregator_rav_timestamp_cuttoff( &second_batch, first_rav_response.data ); - let second_rav_response: jsonrpsee_helpers::JsonRpcResponse = + let second_rav_response: jsonrpsee_helpers::JsonRpcResponse = client.request("aggregate_receipts", params).await?; // Compute the expected aggregate value and check that it matches the latest RAV.